18#include <FreeNOS/System.h>
38#pragma clang optimize off
39#pragma GCC push_options
40#pragma GCC optimize ("O0")
75 ERROR(
"failed to allocate NetworkClient");
83 ERROR(
"failed to initialize network client for device "
84 << device <<
": result = " << (
int) result);
92 ERROR(
"failed to create UDP socket on device " << device <<
93 ": result = " << (
int) result);
101 ERROR(
"failed to bind socket to UDP port " <<
UdpPort <<
102 " on device " << device <<
": result = " << (
int) result);
116 Size packetSize =
sizeof(packet);
125 ERROR(
"failed to receive UDP packet: result = " << (
int) recvResult);
134 ERROR(
"failed to process UDP packet: result = " << (
int) procResult);
146 DEBUG(
"size = " << size);
150 &addr,
sizeof(addr));
166 DEBUG(
"count = " << count);
208 &addr,
sizeof(addr));
217 " at port " << addr.
port);
228 DEBUG(
"hdr->operation = " << hdr->
operation <<
" size = " << size);
258 const u8 *buf = (packet +
sizeof(*header));
260 NOTICE(
"rankId = " <<
header->rankId <<
" datatype = " <<
261 header->datatype <<
" datacount = " <<
header->datacount);
265 ERROR(
"rankId " <<
header->rankId <<
" not found");
274 msg.
integer = *(((
int *) buf) + i);
278 msg.
uchar = *(((
u8 *) buf) + i);
283 ERROR(
"unsupported datatype = " <<
header->datatype);
303 Size packetCount = 0;
307 NOTICE("rankId = " << header->rankId << " datatype = " <<
308 header->datatype << " datacount = " << header->datacount);
310 if (!(ch = m_readChannels.get(header->rankId)))
312 ERROR("rankId " << header->rankId << " not found");
317 for (Size i = 0; i < header->datacount;)
320 Header *hdr = (Header *) pkts[packetCount];
321 u8 *buf = (u8 *)(hdr + 1);
322 Size pktSize = sizeof(Header);
325 hdr->operation = MpiOpRecv;
326 hdr->result = MPI_SUCCESS;
327 hdr->coreId = header->coreId;
328 hdr->rankId = header->rankId;
329 hdr->datatype = header->datatype;
332 while (pktSize < MaximumPacketSize && i < header->datacount)
334 while (ch->read(&msg) != Channel::Success)
342 *(((
int *) buf) + hdr->datacount) = msg.integer;
343 pktSize +=
sizeof(int);
347 *(((
u8 *) buf) + hdr->datacount) = msg.uchar;
348 pktSize +=
sizeof(
u8);
353 ERROR(
"unsupported datatype = " <<
header->datatype);
364 vec[packetCount].iov_base = (
void *) hdr;
365 vec[packetCount].iov_len = pktSize;
371 const Result sendResult = udpSendMultiple(vec, packetCount, addr);
372 if (sendResult != Success)
374 ERROR(
"failed to send multiple UDP packets: result = " << (
int) sendResult);
396 DEBUG(
"exec: cmd = '" << cmd <<
"' rankId = " <<
header->rankId <<
397 " coreId = " <<
header->coreId <<
" coreCount = " <<
header->coreCount);
402 ERROR(
"failed to create MPI communication channels for rankId = " <<
header->rankId <<
403 " result = " << (
int) chanResult);
423 Size pktSize =
sizeof(*hdr);
428 ERROR(
"failed to send UDP packet: result = " << (
int) sendResult);
444 NOTICE(
"size = " << size);
472 ERROR(
"failed to release memory of communication channels: result = " << (
int) releaseResult);
489 ERROR(
"failed to send UDP packet: result = " << (
int) sendResult);
497 const Size coreCount)
499 DEBUG(
"rankId = " << rankId <<
" coreCount = " << coreCount);
512 ERROR(
"failed to allocate MemoryChannel: result = " << (
int) vmResult);
527 ERROR(
"failed to allocate consumer MemoryChannel for rankId = " << rankId);
538 ERROR(
"failed to unmap read MemoryChannel: result = " << (
int) unmapResult);
546 NOTICE(
"readChannel: rank" << rankId <<
": data = " << (
void *) readMemoryBase <<
547 " feedback = " << (
void *) (readMemoryBase +
PAGESIZE));
555 ERROR(
"failed to allocate producer MemoryChannel for rankId = " << rankId);
566 ERROR(
"failed to unmap write MemoryChannel: result = " << (
int) unmapResult);
574 NOTICE(
"writeChannel: rank" << rankId <<
": data = " << (
void *) writeMemoryBase <<
575 " feedback = " << (
void *) (writeMemoryBase +
PAGESIZE));
582 const Size coreCount)
584 DEBUG(
"command = '" << command <<
"' rankId = " << rankId <<
585 " coreCount = " << coreCount);
591 programPath <<
"/bin/" << *programArgs[0];
595 programCmd << programPath <<
" --slave " <<
604 programCmd <<
" " << (*it.
current());
607 NOTICE(
"programCmd = '" << *programCmd <<
"'");
611 char **argv =
new char*[fullProgramArgs.
count() + 1];
616 argv[argc++] = *i.current();
618 NOTICE(
"argv[" << (argc-1) <<
"] = " << argv[argc-1]);
630 ERROR(
"failed to start program on local core: result = " << (
int) execResult);
645 const Size coreCount)
647 DEBUG(
"coreId = " <<
coreId <<
" command = '" << command <<
648 "' rankId = " << rankId <<
" coreCount = " << coreCount);
655 programPath <<
"/bin/" << *programArgs[0];
662 ERROR(
"failed to read program at path '" << *programPath <<
663 "': result = " << (
int) readResult);
672 ERROR(
"failed to initialize LZ4 decompressor: result = " << (
int) lz4Result);
678 uncompProgRange.
virt = 0;
679 uncompProgRange.
phys = 0;
685 ERROR(
"failed to allocate program buffer: result = " << (
int) vmResult);
689 u8 *programBuffer = (
u8 *) uncompProgRange.
virt;
696 ERROR(
"failed to decompress program buffer: result = " << (
int) decompResult);
702 programCmd << programPath <<
" --slave " <<
711 programCmd <<
" " << (*i.
current());
714 DEBUG(
"programCmd = '" << *programCmd <<
"'");
721 ERROR(
"failed to create process on core" <<
coreId <<
": result = " << (
int) result);
729 ERROR(
"failed to release memory of uncompressed program: result = " << (
int) releaseResult);
SystemDescriptorHeader header
Result
Enumeration of generic kernel API result codes.
Helper class to launch an external program.
Result exec()
Runs the external program.
const ProcessID getPid() const
Retrieve Process Identifier of the program.
const ArgumentContainer & arguments() const
Get program arguments.
ArgumentParser & parser()
Get program arguments parser.
const char * get(const char *name) const
Get argument by name.
void setDescription(const String &desc)
Set program description.
Result registerPositional(const char *name, const char *description, Size count=1)
Register a positional argument.
virtual Size size() const
Returns the maximum size of this Array.
virtual bool insert(Size position, const T &item)
Puts the given item at the given position.
Provides a buffered abstract interface to a file.
const Size size() const
Get file size.
Result read()
Read the file (buffered)
const void * buffer() const
Get file buffer.
CoreClient provides a simple interface to a CoreServer.
Core::Result createProcess(const Size coreId, const Address programAddr, const Size programSize, const char *programCmd) const
Create a new process on a different core.
static const Size MaximumLength
Maximum length of a filesystem path in bytes.
static const String toString(const Address address)
Convert address to string.
virtual T * get(const Size position) const
Returns the item at the given position.
virtual bool insertAt(const Size position, T *item)
Inserts the given item at the given position.
virtual bool hasCurrent() const
Check if there is a current item on the List.
virtual const T & current() const
Get current item in the List.
Simple linked list template class.
Size count() const
Get the number of items on the list.
Decompress data using the LZ4 algorithm created by Yann Collet.
Result initialize()
Initialize the decompressor.
u64 getUncompressedSize() const
Get size of the uncompressed data.
Result read(void *buffer, const Size size) const
Reads compressed data.
static void * set(void *dest, int ch, unsigned count)
Fill memory with a constant byte.
static Size copy(void *dest, const void *src, Size count)
Copy memory from one place to another.
Unidirectional point-to-point channel using shared memory.
Result setPhysical(const Address data, const Address feedback, const bool hardReset=true)
Set memory pages by physical address.
Result unmap()
Unmap memory pages from virtual address space.
virtual Result write(const void *buffer)
Write a message.
virtual Result exec()
Run the server.
int m_sock
IP/UDP socket for external communication.
static const Size ReceiveTimeoutMs
Timeout in milliseconds to wait for packet receive.
Result udpSendMultiple(const struct iovec *vec, const Size count, const struct sockaddr &addr) const
Send multiple UDP packets.
Result startLocalProcess(const char *command, const Size rankId, const Size coreCount)
Start a process on the local processor.
Index< MemoryChannel, MaximumChannels > m_writeChannels
Stores all channels for sending data to processes.
Result processSend(const Header *header, const u8 *packet, const Size size)
Process MPI send request.
Result processRecv(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process MPI recv request.
NetworkClient * m_client
Networking client object.
Result startRemoteProcess(const Size coreId, const char *command, const Size rankId, const Size coreCount)
Start a process on a secondary processor.
Result processTerminate(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process MPI terminate request.
Array< ProcessID, MaximumChannels > m_pids
Records the PID of each process participating in the computation.
virtual ~MpiProxy()
Destructor.
Index< MemoryChannel, MaximumChannels > m_readChannels
Stores all channels for receiving data from processes.
Result processRequest(const u8 *packet, const Size size, const struct sockaddr &addr)
Process incoming packet.
Result createChannels(const Size rankId, const Size coreCount)
Create communication channels.
static const Size MaximumPacketSize
Maximum size of packet payload.
Result processExec(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process execute request.
Memory::Range m_memChannelBase
Memory base address for local MPI communication.
virtual Result initialize()
Initialize the server.
MpiProxy(int argc, char **argv)
Constructor.
Result udpSend(const void *packet, const Size size, const struct sockaddr &addr) const
Send UDP packet.
static const u16 UdpPort
Port number for IP/UDP traffic.
Result udpReceive(void *packet, Size &size, struct sockaddr &addr) const
Receive UDP packet.
Networking Client implementation.
Result bindSocket(const int sock, const IPV4::Address addr=0, const u16 port=0)
Bind socket to address/port.
Result initialize()
Perform initialization.
Result waitSocket(const NetworkClient::SocketType type, const int sock, const Size msecTimeout)
Wait until the given socket has data to receive.
Result createSocket(const SocketType type, int *socket)
Create new socket.
Networking packet queue implementation.
static const Size PayloadBufferSize
Size of payload memory buffer.
static const Size MaxPackets
Maximum number of packets available.
POSIX-compatible application.
virtual void fill(T value)
Fill the Sequence with the given value.
List< String > split(const char delimiter) const
Split the String into parts separated by a delimiter.
API::Result ProcessCtl(const ProcessID proc, const ProcessOperation op, const Address addr=0, const Address output=0)
Prototype for user applications.
API::Result VMCtl(const ProcessID procID, const MemoryOperation op, Memory::Range *range=ZERO)
Prototype for user applications.
#define PAGESIZE
ARM uses 4K pages.
C char * strerror(int errnum)
The strerror function maps the number in errnum to a message string.
C int errno
The lvalue errno is used by many functions to return error values.
C int sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *addr, socklen_t addrlen)
Send a single datagram to a remote host.
#define assert(exp)
Insert program diagnostics.
C int recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *addr, socklen_t addrlen)
Receive a single datagram from a socket.
C pid_t waitpid(pid_t pid, int *stat_loc, int options)
Wait for a child process to stop or terminate.
C int sendmsg(int sockfd, const struct msghdr *msg, int flags)
Send multiple datagrams to a remote host.
#define NULL
NULL means zero.
unsigned long Address
A memory address.
#define ERROR(msg)
Output an error message.
#define NOTICE(msg)
Output a notice message.
unsigned int Size
Any sane size indicator cannot go negative.
#define DEBUG(msg)
Output a debug message to standard output.
unsigned char u8
Unsigned 8-bit number.
Result
Result code for Actions.
Size size
Size in number of bytes.
Address phys
Physical address.
Address virt
Virtual address.
Access access
Page access flags.
Input/Output vector for multi-packet operations.
Describes one or more datagrams.
Defines a socket address and port pair.