FreeNOS
MpiProxy.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2020 Niek Linnenbank
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#include <FreeNOS/System.h>
19#include <Log.h>
20#include <IPV4.h>
21#include <errno.h>
22#include <string.h>
23#include <sys/socket.h>
24#include <sys/wait.h>
25#include <MemoryBlock.h>
26#include <MemoryChannel.h>
27#include <FileSystemPath.h>
28#include <NetworkClient.h>
29#include <NetworkQueue.h>
30#include <BufferedFile.h>
31#include <MPIMessage.h>
32#include <ApplicationLauncher.h>
33#include <Lz4Decompressor.h>
34#include <CoreClient.h>
35#include <mpi.h>
36#include "MpiProxy.h"
37
38#pragma clang optimize off
39#pragma GCC push_options
40#pragma GCC optimize ("O0")
41
42MpiProxy::MpiProxy(int argc, char **argv)
43 : POSIXApplication(argc, argv)
44 , m_sock(-1)
45 , m_client(ZERO)
46{
49
50 parser().setDescription("Message Passing Interface (MPI) proxy server");
51 parser().registerPositional("DEVICE", "device name of network adapter");
52}
53
55{
56 DEBUG("");
57
58 if (m_client != ZERO)
59 {
60 delete m_client;
61 }
62}
63
65{
66 const char *device = arguments().get("DEVICE");
68
69 DEBUG("");
70
71 // Create a network client
72 m_client = new NetworkClient(device);
73 if (m_client == ZERO)
74 {
75 ERROR("failed to allocate NetworkClient");
76 return IOError;
77 }
78
79 // Initialize networking client
80 result = m_client->initialize();
81 if (result != NetworkClient::Success)
82 {
83 ERROR("failed to initialize network client for device "
84 << device << ": result = " << (int) result);
85 return IOError;
86 }
87
88 // Create an UDP socket
90 if (result != NetworkClient::Success)
91 {
92 ERROR("failed to create UDP socket on device " << device <<
93 ": result = " << (int) result);
94 return IOError;
95 }
96
97 // Bind to a local port.
98 result = m_client->bindSocket(m_sock, 0, UdpPort);
99 if (result != NetworkClient::Success)
100 {
101 ERROR("failed to bind socket to UDP port " << UdpPort <<
102 " on device " << device << ": result = " << (int) result);
103 return IOError;
104 }
105
106 return Success;
107}
108
110{
111 DEBUG("");
112
113 while (true)
114 {
115 static u8 packet[MaximumPacketSize];
116 Size packetSize = sizeof(packet);
117 struct sockaddr addr;
118
119 // Wait for UDP packet
120 const Result recvResult = udpReceive(packet, packetSize, addr);
121 if (recvResult != Success)
122 {
123 if (recvResult != TimedOut)
124 {
125 ERROR("failed to receive UDP packet: result = " << (int) recvResult);
126 }
127 continue;
128 }
129
130 // Process the packet
131 const Result procResult = processRequest(packet, packetSize, addr);
132 if (procResult != Success)
133 {
134 ERROR("failed to process UDP packet: result = " << (int) procResult);
135 continue;
136 }
137 }
138
139 return Success;
140}
141
143 const Size size,
144 const struct sockaddr & addr) const
145{
146 DEBUG("size = " << size);
147
148 // Send the packet
149 int result = ::sendto(m_sock, packet, size, 0,
150 &addr, sizeof(addr));
151 if (result <= 0)
152 {
153 ERROR("failed to send UDP datagram: " << strerror(errno));
154 return IOError;
155 }
156
157 return Success;
158}
159
161 const Size count,
162 const struct sockaddr & addr) const
163{
164 struct msghdr msg;
165
166 DEBUG("count = " << count);
167
168 // Prepare the message header
169 msg.msg_name = (void *) &addr;
170 msg.msg_namelen = sizeof(addr);
171 msg.msg_iov = (struct iovec *) vec;
172 msg.msg_iovlen = count;
173
174 // Send the packet
175 int result = ::sendmsg(m_sock, &msg, 0);
176 if (result <= 0)
177 {
178 ERROR("failed to send multiple UDP datagrams: " << strerror(errno));
179 return IOError;
180 }
181
182 return Success;
183}
184
186 Size & size,
187 struct sockaddr & addr) const
188{
189 DEBUG("");
190
191 // Wait for a packet in the UDP socket
193 if (result != NetworkClient::Success)
194 {
195 if (result == NetworkClient::TimedOut)
196 {
197 return TimedOut;
198 }
199 else
200 {
201 ERROR("failed to wait for UDP socket " << m_sock);
202 return IOError;
203 }
204 }
205
206 // Receive UDP datagram
207 int r = recvfrom(m_sock, packet, size, 0,
208 &addr, sizeof(addr));
209 if (r < 0)
210 {
211 ERROR("failed to receive UDP datagram: " << strerror(errno));
212 return IOError;
213 }
214
215 size = r;
216 DEBUG("received " << r << " bytes from " << *IPV4::toString(addr.addr) <<
217 " at port " << addr.port);
218
219 return Success;
220}
221
223 const Size size,
224 const struct sockaddr & addr)
225{
226 const Header *hdr = (const Header *)(packet);
227
228 DEBUG("hdr->operation = " << hdr->operation << " size = " << size);
229
230 switch (hdr->operation)
231 {
232 case MpiOpSend:
233 return processSend(hdr, packet, size);
234
235 case MpiOpRecv:
236 return processRecv(hdr, packet, size, addr);
237
238 case MpiOpExec:
239 return processExec(hdr, packet, size, addr);
240
241 case MpiOpTerminate:
242 return processTerminate(hdr, packet, size, addr);
243
244 default:
245 ERROR("unknown operation: " << Number::Hex << hdr->operation);
246 break;
247 }
248
249 return Success;
250}
251
253 const u8 *packet,
254 const Size size)
255{
256 MPIMessage msg;
257 MemoryChannel *ch;
258 const u8 *buf = (packet + sizeof(*header));
259
260 NOTICE("rankId = " << header->rankId << " datatype = " <<
261 header->datatype << " datacount = " << header->datacount);
262
263 if (!(ch = m_writeChannels.get(header->rankId)))
264 {
265 ERROR("rankId " << header->rankId << " not found");
266 return NotFound;
267 }
268
269 for (Size i = 0; i < header->datacount; i++)
270 {
271 switch (header->datatype)
272 {
273 case MPI_INT:
274 msg.integer = *(((int *) buf) + i);
275 break;
276
278 msg.uchar = *(((u8 *) buf) + i);
279 break;
280
281 default:
282 {
283 ERROR("unsupported datatype = " << header->datatype);
284 return NotFound;
285 }
286 }
287
288 while (ch->write(&msg) != Channel::Success)
289 ;
290 }
291
292 return Success;
293}
294
296 const u8 *packet,
297 const Size size,
298 const struct sockaddr & addr)
299{
300 MemoryChannel *ch;
302 static struct iovec vec[NetworkQueue::MaxPackets];
303 Size packetCount = 0;
304
305 assert(NetworkQueue::PayloadBufferSize >= MaximumPacketSize);
306
307 NOTICE("rankId = " << header->rankId << " datatype = " <<
308 header->datatype << " datacount = " << header->datacount);
309
310 if (!(ch = m_readChannels.get(header->rankId)))
311 {
312 ERROR("rankId " << header->rankId << " not found");
313 return NotFound;
314 }
315
316 // Read from the channel and send out packet(s)
317 for (Size i = 0; i < header->datacount;)
318 {
319 MPIMessage msg;
320 Header *hdr = (Header *) pkts[packetCount];
321 u8 *buf = (u8 *)(hdr + 1);
322 Size pktSize = sizeof(Header);
323
324 // Prepare header
325 hdr->operation = MpiOpRecv;
326 hdr->result = MPI_SUCCESS;
327 hdr->coreId = header->coreId;
328 hdr->rankId = header->rankId;
329 hdr->datatype = header->datatype;
330 hdr->datacount = 0;
331
332 while (pktSize < MaximumPacketSize && i < header->datacount)
333 {
334 while (ch->read(&msg) != Channel::Success)
335 {
337 }
338
339 switch (header->datatype)
340 {
341 case MPI_INT:
342 *(((int *) buf) + hdr->datacount) = msg.integer;
343 pktSize += sizeof(int);
344 break;
345
347 *(((u8 *) buf) + hdr->datacount) = msg.uchar;
348 pktSize += sizeof(u8);
349 break;
350
351 default:
352 {
353 ERROR("unsupported datatype = " << header->datatype);
354 return NotFound;
355 }
356 }
357 // Move to the next data item
358 i++;
359 pktSize++;
360 hdr->datacount++;
361 }
362
363 // Fill the I/O vector struct
364 vec[packetCount].iov_base = (void *) hdr;
365 vec[packetCount].iov_len = pktSize;
366 packetCount++;
367
368 if (hdr->datacount != 0 && (packetCount == NetworkQueue::MaxPackets || i >= header->datacount))
369 {
370 // UDP send
371 const Result sendResult = udpSendMultiple(vec, packetCount, addr);
372 if (sendResult != Success)
373 {
374 ERROR("failed to send multiple UDP packets: result = " << (int) sendResult);
375 return sendResult;
376 }
377 packetCount = 0;
378 }
379 }
380
381 return Success;
382}
383
385 const u8 *packet,
386 const Size size,
387 const struct sockaddr & addr)
388{
389 char cmd[FileSystemPath::MaximumLength + 1];
390 Result result = Success;
391
392 MemoryBlock::set(cmd, 0, sizeof(cmd));
393 MemoryBlock::copy(cmd, header + 1, size - sizeof(*header));
395
396 DEBUG("exec: cmd = '" << cmd << "' rankId = " << header->rankId <<
397 " coreId = " << header->coreId << " coreCount = " << header->coreCount);
398
399 const Result chanResult = createChannels(header->rankId, header->coreCount);
400 if (chanResult != Success)
401 {
402 ERROR("failed to create MPI communication channels for rankId = " << header->rankId <<
403 " result = " << (int) chanResult);
404 return chanResult;
405 }
406
407 if (header->coreId == 0)
408 {
409 result = startLocalProcess(cmd, header->rankId, header->coreCount);
410 }
411 else
412 {
413 result = startRemoteProcess(header->coreId, cmd, header->rankId, header->coreCount);
414 }
415
416 // Send acknowledge of start
417 static u8 pkt[MaximumPacketSize];
418 Header *hdr = (Header *) pkt;
419 hdr->operation = MpiOpExec;
420 hdr->result = result == Success ? MPI_SUCCESS : MPI_ERR_IO;
421 hdr->coreId = header->coreId;
422 hdr->rankId = header->rankId;
423 Size pktSize = sizeof(*hdr);
424
425 const Result sendResult = udpSend(pkt, pktSize, addr);
426 if (sendResult != Success)
427 {
428 ERROR("failed to send UDP packet: result = " << (int) sendResult);
429 return sendResult;
430 }
431
432 return result;
433}
434
436 const u8 *packet,
437 const Size size,
438 const struct sockaddr & addr)
439{
440 static u8 pkt[MaximumPacketSize];
441 Header *hdr = (Header *) pkt;
442 Size pktSize = sizeof(Header);
443
444 NOTICE("size = " << size);
445
446 // Loop PIDs of active processes and wait for each to terminate
447 for (Size i = 0; i < m_pids.size(); i++)
448 {
449 if (m_pids[i] != ANY)
450 {
451 if (i == 0)
452 {
453 int status;
454 waitpid(m_pids[i], &status, 0);
455 }
456 else
457 {
458 // TODO: send a CoreClient::ping to remote cores,
459 // to check if they are finished with the remote program
460 }
461
462 m_pids.insert(i, ANY);
463 }
464 }
465
466 // Release m_memChannelBase here
468 {
469 const API::Result releaseResult = VMCtl(SELF, Release, &m_memChannelBase);
470 if (releaseResult != API::Success)
471 {
472 ERROR("failed to release memory of communication channels: result = " << (int) releaseResult);
473 return IOError;
474 }
475
477 }
478
479 // Prepare header for response
481 hdr->result = MPI_SUCCESS;
482 hdr->rankId = header->rankId;
483 hdr->coreId = header->coreId;
484
485 // UDP send
486 const Result sendResult = udpSend(pkt, pktSize, addr);
487 if (sendResult != Success)
488 {
489 ERROR("failed to send UDP packet: result = " << (int) sendResult);
490 return sendResult;
491 }
492
493 return Success;
494}
495
497 const Size coreCount)
498{
499 DEBUG("rankId = " << rankId << " coreCount = " << coreCount);
500
501 // Allocate memory space for two-way communication
502 // between the proxy server and the other processes
504 {
505 m_memChannelBase.size = (PAGESIZE * 2) * coreCount * 2;
510 if (vmResult != API::Success)
511 {
512 ERROR("failed to allocate MemoryChannel: result = " << (int) vmResult);
513 return OutOfMemory;
514 }
515
516 // Clear channel pages
518 NOTICE("MemoryChannel at physical address " << (void *) m_memChannelBase.phys);
519 }
520
521 // Create read channel
522 if (m_readChannels.get(rankId) == ZERO)
523 {
525 if (!ch)
526 {
527 ERROR("failed to allocate consumer MemoryChannel for rankId = " << rankId);
528 return OutOfMemory;
529 }
530
531 m_readChannels.insertAt(rankId, ch);
532 }
533 else
534 {
535 const MemoryChannel::Result unmapResult = m_readChannels.get(rankId)->unmap();
536 if (unmapResult != MemoryChannel::Success)
537 {
538 ERROR("failed to unmap read MemoryChannel: result = " << (int) unmapResult);
539 return IOError;
540 }
541 }
542
543 const Address readMemoryBase = m_memChannelBase.phys + (PAGESIZE * 2 * rankId);
544 m_readChannels.get(rankId)->setPhysical(readMemoryBase, readMemoryBase + PAGESIZE);
545
546 NOTICE("readChannel: rank" << rankId << ": data = " << (void *) readMemoryBase <<
547 " feedback = " << (void *) (readMemoryBase + PAGESIZE));
548
549 // Create write channel
550 if (m_writeChannels.get(rankId) == ZERO)
551 {
553 if (!ch)
554 {
555 ERROR("failed to allocate producer MemoryChannel for rankId = " << rankId);
556 return OutOfMemory;
557 }
558
559 m_writeChannels.insertAt(rankId, ch);
560 }
561 else
562 {
563 const MemoryChannel::Result unmapResult = m_writeChannels.get(rankId)->unmap();
564 if (unmapResult != MemoryChannel::Success)
565 {
566 ERROR("failed to unmap write MemoryChannel: result = " << (int) unmapResult);
567 return IOError;
568 }
569 }
570
571 const Address writeMemoryBase = m_memChannelBase.phys + (PAGESIZE * 2 * coreCount) + (PAGESIZE * 2 * rankId);
572 m_writeChannels.get(rankId)->setPhysical(writeMemoryBase, writeMemoryBase + PAGESIZE);
573
574 NOTICE("writeChannel: rank" << rankId << ": data = " << (void *) writeMemoryBase <<
575 " feedback = " << (void *) (writeMemoryBase + PAGESIZE));
576
577 return Success;
578}
579
581 const Size rankId,
582 const Size coreCount)
583{
584 DEBUG("command = '" << command << "' rankId = " << rankId <<
585 " coreCount = " << coreCount);
586
587 List<String> programArgs = String(command).split(' ');
588 String programPath;
589
590 // Prepare full path to the program to start
591 programPath << "/bin/" << *programArgs[0];
592
593 // Format program arguments with MPI specific arguments for the slaves
594 String programCmd;
595 programCmd << programPath << " --slave " <<
596 Number::Hex << (void *)(m_memChannelBase.phys) << " " <<
597 Number::Dec << rankId << " " << coreCount;
598
599 // Append additional user arguments
600 ListIterator<String> it(programArgs);
601 it++;
602 for (; it.hasCurrent(); it++)
603 {
604 programCmd << " " << (*it.current());
605 }
606
607 NOTICE("programCmd = '" << *programCmd << "'");
608
609 // Convert full programCmd to argc/argv format for ApplicationLauncher
610 List<String> fullProgramArgs = programCmd.split(' ');
611 char **argv = new char*[fullProgramArgs.count() + 1];
612 int argc = 0;
613
614 for (ListIterator<String> i(fullProgramArgs); i.hasCurrent(); i++)
615 {
616 argv[argc++] = *i.current();
617
618 NOTICE("argv[" << (argc-1) << "] = " << argv[argc-1]);
619 }
620
621 // Terminate argv array with zero
622 argv[fullProgramArgs.count()] = ZERO;
623
624 // Start the local process on this core
625 ApplicationLauncher launcher(*programPath, (const char **) argv);
626
627 const ApplicationLauncher::Result execResult = launcher.exec();
628 if (execResult != ApplicationLauncher::Success)
629 {
630 ERROR("failed to start program on local core: result = " << (int) execResult);
631 delete[] argv;
632 return IOError;
633 }
634
635 NOTICE("started with PID = " << launcher.getPid());
636 m_pids.insert(0, launcher.getPid());
637
638 delete[] argv;
639 return Success;
640}
641
643 const char *command,
644 const Size rankId,
645 const Size coreCount)
646{
647 DEBUG("coreId = " << coreId << " command = '" << command <<
648 "' rankId = " << rankId << " coreCount = " << coreCount);
649
650 const CoreClient coreClient;
651 List<String> programArgs = String(command).split(' ');
652 String programPath;
653
654 // Prepare full path to the program to start
655 programPath << "/bin/" << *programArgs[0];
656
657 // Try to read the raw ELF program data (compressed)
658 BufferedFile programFile(*programPath);
659 const BufferedFile::Result readResult = programFile.read();
660 if (readResult != BufferedFile::Success)
661 {
662 ERROR("failed to read program at path '" << *programPath <<
663 "': result = " << (int) readResult);
664 return NotFound;
665 }
666
667 // Initialize decompressor
668 Lz4Decompressor lz4(programFile.buffer(), programFile.size());
669 Lz4Decompressor::Result lz4Result = lz4.initialize();
670 if (lz4Result != Lz4Decompressor::Success)
671 {
672 ERROR("failed to initialize LZ4 decompressor: result = " << (int) lz4Result);
673 return IOError;
674 }
675
676 // Allocate memory for decompressed program
677 Memory::Range uncompProgRange;
678 uncompProgRange.virt = 0;
679 uncompProgRange.phys = 0;
680 uncompProgRange.size = lz4.getUncompressedSize();
682 API::Result vmResult = VMCtl(SELF, MapContiguous, &uncompProgRange);
683 if (vmResult != API::Success)
684 {
685 ERROR("failed to allocate program buffer: result = " << (int) vmResult);
686 return OutOfMemory;
687 }
688
689 u8 *programBuffer = (u8 *) uncompProgRange.virt;
690 assert(programBuffer != NULL);
691
692 // Decompress entire file
693 const Lz4Decompressor::Result decompResult = lz4.read(programBuffer, lz4.getUncompressedSize());
694 if (decompResult != Lz4Decompressor::Success)
695 {
696 ERROR("failed to decompress program buffer: result = " << (int) decompResult);
697 return IOError;
698 }
699
700 // Format program command with MPI specific arguments for the slaves
701 String programCmd;
702 programCmd << programPath << " --slave " <<
703 Number::Hex << (void *)(m_memChannelBase.phys) << " " <<
704 Number::Dec << rankId << " " << coreCount;
705
706 // Append additional user arguments
707 ListIterator<String> i(programArgs);
708 i++;
709 for (; i.hasCurrent(); i++)
710 {
711 programCmd << " " << (*i.current());
712 }
713
714 DEBUG("programCmd = '" << *programCmd << "'");
715
716 // Start the program on the secondary core
717 const Core::Result result = coreClient.createProcess(coreId, (const Address) programBuffer,
718 lz4.getUncompressedSize(), *programCmd);
719 if (result != Core::Success)
720 {
721 ERROR("failed to create process on core" << coreId << ": result = " << (int) result);
722 return IOError;
723 }
724
725 // Cleanup uncompressed program buffer
726 const API::Result releaseResult = VMCtl(SELF, Release, &uncompProgRange);
727 if (releaseResult != API::Success)
728 {
729 ERROR("failed to release memory of uncompressed program: result = " << (int) releaseResult);
730 return IOError;
731 }
732
733 // PID of remote processes are not currently visible
735 return Success;
736}
SystemDescriptorHeader header
Definition IntelACPI.h:0
u8 coreId
Definition IntelACPI.h:1
Result
Enumeration of generic kernel API result codes.
Definition API.h:69
@ Success
Definition API.h:70
Helper class to launch an external program.
Result exec()
Runs the external program.
const ProcessID getPid() const
Retrieve Process Identifier of the program.
Result
Result codes.
Definition Application.h:54
const ArgumentContainer & arguments() const
Get program arguments.
ArgumentParser & parser()
Get program arguments parser.
const char * get(const char *name) const
Get argument by name.
void setDescription(const String &desc)
Set program description.
Result registerPositional(const char *name, const char *description, Size count=1)
Register a positional argument.
virtual Size size() const
Returns the maximum size of this Array.
Definition Array.h:138
virtual bool insert(Size position, const T &item)
Puts the given item at the given position.
Definition Array.h:74
Provides a buffered abstract interface to a file.
const Size size() const
Get file size.
Result
Result codes.
Result read()
Read the file (buffered)
const void * buffer() const
Get file buffer.
@ Consumer
Definition Channel.h:59
@ Producer
Definition Channel.h:58
Result
Result codes.
Definition Channel.h:42
@ Success
Definition Channel.h:43
CoreClient provides a simple interface to a CoreServer.
Definition CoreClient.h:41
Core::Result createProcess(const Size coreId, const Address programAddr, const Size programSize, const char *programCmd) const
Create a new process on a different core.
static const Size MaximumLength
Maximum length of a filesystem path in bytes.
static const String toString(const Address address)
Convert address to string.
Definition IPV4.cpp:82
virtual T * get(const Size position) const
Returns the item at the given position.
Definition Index.h:187
virtual bool insertAt(const Size position, T *item)
Inserts the given item at the given position.
Definition Index.h:113
Iterate through a List.
virtual bool hasCurrent() const
Check if there is a current item on the List.
virtual const T & current() const
Get current item in the List.
Simple linked list template class.
Definition List.h:37
Size count() const
Get the number of items on the list.
Definition List.h:402
Decompress data using the LZ4 algorithm created by Yann Collet.
Result initialize()
Initialize the decompressor.
u64 getUncompressedSize() const
Get size of the uncompressed data.
Result
Result codes.
Result read(void *buffer, const Size size) const
Reads compressed data.
static void * set(void *dest, int ch, unsigned count)
Fill memory with a constant byte.
static Size copy(void *dest, const void *src, Size count)
Copy memory from one place to another.
Unidirectional point-to-point channel using shared memory.
Result setPhysical(const Address data, const Address feedback, const bool hardReset=true)
Set memory pages by physical address.
Result unmap()
Unmap memory pages from virtual address space.
virtual Result write(const void *buffer)
Write a message.
virtual Result exec()
Run the server.
Definition MpiProxy.cpp:109
int m_sock
IP/UDP socket for external communication.
Definition MpiProxy.h:282
static const Size ReceiveTimeoutMs
Timeout in milliseconds to wait for packet receive.
Definition MpiProxy.h:58
Result udpSendMultiple(const struct iovec *vec, const Size count, const struct sockaddr &addr) const
Send multiple UDP packets.
Definition MpiProxy.cpp:160
Result startLocalProcess(const char *command, const Size rankId, const Size coreCount)
Start a process on the local processor.
Definition MpiProxy.cpp:580
Index< MemoryChannel, MaximumChannels > m_writeChannels
Stores all channels for sending data to processes.
Definition MpiProxy.h:294
Result processSend(const Header *header, const u8 *packet, const Size size)
Process MPI send request.
Definition MpiProxy.cpp:252
Result processRecv(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process MPI recv request.
Definition MpiProxy.cpp:295
NetworkClient * m_client
Networking client object.
Definition MpiProxy.h:285
Result startRemoteProcess(const Size coreId, const char *command, const Size rankId, const Size coreCount)
Start a process on a secondary processor.
Definition MpiProxy.cpp:642
Result processTerminate(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process MPI terminate request.
Definition MpiProxy.cpp:435
Array< ProcessID, MaximumChannels > m_pids
Records the PID of each process participating in the computation.
Definition MpiProxy.h:297
virtual ~MpiProxy()
Destructor.
Definition MpiProxy.cpp:54
@ MpiOpRecv
Definition MpiProxy.h:74
@ MpiOpSend
Definition MpiProxy.h:73
@ MpiOpTerminate
Definition MpiProxy.h:76
@ MpiOpExec
Definition MpiProxy.h:75
Index< MemoryChannel, MaximumChannels > m_readChannels
Stores all channels for receiving data from processes.
Definition MpiProxy.h:291
Result processRequest(const u8 *packet, const Size size, const struct sockaddr &addr)
Process incoming packet.
Definition MpiProxy.cpp:222
Result createChannels(const Size rankId, const Size coreCount)
Create communication channels.
Definition MpiProxy.cpp:496
static const Size MaximumPacketSize
Maximum size of packet payload.
Definition MpiProxy.h:66
Result processExec(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process execute request.
Definition MpiProxy.cpp:384
Memory::Range m_memChannelBase
Memory base address for local MPI communication.
Definition MpiProxy.h:288
virtual Result initialize()
Initialize the server.
Definition MpiProxy.cpp:64
MpiProxy(int argc, char **argv)
Constructor.
Definition MpiProxy.cpp:42
Result udpSend(const void *packet, const Size size, const struct sockaddr &addr) const
Send UDP packet.
Definition MpiProxy.cpp:142
static const u16 UdpPort
Port number for IP/UDP traffic.
Definition MpiProxy.h:55
Result udpReceive(void *packet, Size &size, struct sockaddr &addr) const
Receive UDP packet.
Definition MpiProxy.cpp:185
Networking Client implementation.
Result bindSocket(const int sock, const IPV4::Address addr=0, const u16 port=0)
Bind socket to address/port.
Result initialize()
Perform initialization.
Result waitSocket(const NetworkClient::SocketType type, const int sock, const Size msecTimeout)
Wait until the given socket has data to receive.
Result
Result codes.
Result createSocket(const SocketType type, int *socket)
Create new socket.
Networking packet queue implementation.
static const Size PayloadBufferSize
Size of payload memory buffer.
static const Size MaxPackets
Maximum number of packets available.
POSIX-compatible application.
virtual void fill(T value)
Fill the Sequence with the given value.
Definition Sequence.h:73
Abstraction of strings.
Definition String.h:42
List< String > split(const char delimiter) const
Split the String into parts separated by a delimiter.
Definition String.cpp:408
#define SELF
Definition ProcessID.h:35
#define ANY
Definition ProcessID.h:34
API::Result ProcessCtl(const ProcessID proc, const ProcessOperation op, const Address addr=0, const Address output=0)
Prototype for user applications.
Definition ProcessCtl.h:93
API::Result VMCtl(const ProcessID procID, const MemoryOperation op, Memory::Range *range=ZERO)
Prototype for user applications.
Definition VMCtl.h:61
@ Release
Definition VMCtl.h:40
@ MapContiguous
Definition VMCtl.h:37
@ Schedule
Definition ProcessCtl.h:52
#define PAGESIZE
ARM uses 4K pages.
Definition ARMConstant.h:97
@ MPI_UNSIGNED_CHAR
Definition mpi.h:52
@ MPI_INT
Definition mpi.h:51
@ MPI_SUCCESS
Definition mpi.h:73
@ MPI_ERR_IO
Definition mpi.h:108
C char * strerror(int errnum)
The strerror function maps the number in errnum to a message string.
Definition strerror.cpp:20
C int errno
The lvalue errno is used by many functions to return error values.
C int sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *addr, socklen_t addrlen)
Send a single datagram to a remote host.
Definition sendto.cpp:25
#define assert(exp)
Insert program diagnostics.
Definition assert.h:60
C int recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *addr, socklen_t addrlen)
Receive a single datagram from a socket.
Definition recvfrom.cpp:25
C pid_t waitpid(pid_t pid, int *stat_loc, int options)
Wait for a child process to stop or terminate.
Definition waitpid.cpp:23
C int sendmsg(int sockfd, const struct msghdr *msg, int flags)
Send multiple datagrams to a remote host.
Definition sendmsg.cpp:24
#define NULL
NULL means zero.
Definition Macros.h:39
unsigned long Address
A memory address.
Definition Types.h:131
#define ERROR(msg)
Output an error message.
Definition Log.h:61
#define NOTICE(msg)
Output a notice message.
Definition Log.h:75
unsigned int Size
Any sane size indicator cannot go negative.
Definition Types.h:128
#define ZERO
Zero value.
Definition Macros.h:43
#define DEBUG(msg)
Output a debug message to standard output.
Definition Log.h:89
unsigned char u8
Unsigned 8-bit number.
Definition Types.h:59
Result
Result code for Actions.
Definition Core.h:48
@ Success
Definition Core.h:49
@ User
Definition Memory.h:44
@ Readable
Definition Memory.h:41
@ Writable
Definition Memory.h:42
@ Hex
Decimal: 0-10.
Definition Types.h:171
@ Dec
Definition Types.h:170
Memory range.
Definition Memory.h:56
Size size
Size in number of bytes.
Definition Memory.h:59
Address phys
Physical address.
Definition Memory.h:58
Address virt
Virtual address.
Definition Memory.h:57
Access access
Page access flags.
Definition Memory.h:60
Packet payload header for MPI messages via IP/UDP.
Definition MpiProxy.h:83
Input/Output vector for multi-packet operations.
Definition socket.h:45
Describes one or more datagrams.
Definition socket.h:56
struct iovec * msg_iov
Definition socket.h:59
socklen_t msg_namelen
Definition socket.h:58
void * msg_name
Definition socket.h:57
size_t msg_iovlen
Definition socket.h:60
Defines a socket address and port pair.
Definition socket.h:36
u32 addr
Definition socket.h:37
u16 port
Definition socket.h:38