FreeNOS
MpiTarget.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2020 Niek Linnenbank
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#include <FreeNOS/User.h>
19#include <MemoryChannel.h>
20#include <CoreClient.h>
21#include <Lz4Decompressor.h>
22#include <Index.h>
23#include <String.h>
24#include <MemoryBlock.h>
25#include <BufferedFile.h>
26#include <MemoryChannel.h>
27#include <Index.h>
28#include <Log.h>
29#include "MPIMessage.h"
30#include "MpiTarget.h"
31
33{
34 return new MpiTarget();
35}
36
38 : m_coreId(0)
39 , m_coreCount(0)
40{
42}
43
45 char ***argv)
46{
47 if ((*argc) >= 5 && MemoryBlock::compare((*argv)[1], "--slave"))
48 {
49 return initializeSlave(argc, argv);
50 }
51 else
52 {
53 return initializeMaster(argc, argv);
54 }
55
56 return MPI_SUCCESS;
57}
58
63
65 int *rank)
66{
67 *rank = m_coreId;
68 return MPI_SUCCESS;
69}
70
72 int *size)
73{
74 *size = m_coreCount;
75 return MPI_SUCCESS;
76}
77
79 int count,
80 MPI_Datatype datatype,
81 int dest,
82 int tag,
83 MPI_Comm comm)
84{
85 MPIMessage msg;
86 MemoryChannel *ch;
87
88 if (!(ch = m_writeChannels.get(dest)))
89 {
90 return MPI_ERR_RANK;
91 }
92
93 for (int i = 0; i < count; i++)
94 {
95 switch (datatype)
96 {
97 case MPI_INT:
98 msg.integer = *(((int *) buf) + i);
99 break;
100
102 msg.uchar = *(((u8 *) buf) + i);
103 break;
104
105 default:
107 }
108
109 while (ch->write(&msg) != Channel::Success)
110 {
112 }
113 }
114
115 return MPI_SUCCESS;
116}
117
119 int count,
120 MPI_Datatype datatype,
121 int source,
122 int tag,
123 MPI_Comm comm,
124 MPI_Status *status)
125{
126 MPIMessage msg;
127 MemoryChannel *ch;
128
129 if (!(ch = m_readChannels.get(source)))
130 {
131 return MPI_ERR_RANK;
132 }
133
134 for (int i = 0; i < count; i++)
135 {
136 while (ch->read(&msg) != Channel::Success)
137 {
139 }
140
141 switch (datatype)
142 {
143 case MPI_INT:
144 *(((int *) buf) + i) = msg.integer;
145 break;
146
148 *(((u8 *) buf) + i) = msg.uchar;
149 break;
150
151 default:
153 }
154 }
155
156 return MPI_SUCCESS;
157}
158
160 char ***argv)
161{
162 const CoreClient coreClient;
163 char *programName = (*argv)[0];
164 String programPath;
165 u8 *programBuffer;
166
167 // We are the master node
168 m_coreId = 0;
169
170 // Retrieve number of cores on the system
171 const Core::Result result = coreClient.getCoreCount(m_coreCount);
172 if (result != Core::Success)
173 {
174 ERROR("failed to retrieve core count from CoreServer: result = " << (int) result);
175 return MPI_ERR_BAD_FILE;
176 }
177
178 // Read our own ELF program to a buffer and pass it to CoreServer
179 // for creating new programs on the remote core.
180 if (!MemoryBlock::compare(programName, "/bin/", 5))
181 {
182 programPath << "/bin/" << programName;
183 }
184 else
185 {
186 programPath << programName;
187 }
188
189 // Try to read the raw ELF program data (compressed)
190 BufferedFile programFile(*programPath);
191 const BufferedFile::Result readResult = programFile.read();
192 if (readResult != BufferedFile::Success)
193 {
194 ERROR("failed to read program at path '" << *programPath << "': result = " << (int) readResult);
195 return MPI_ERR_BAD_FILE;
196 }
197
198 // Initialize decompressor
199 Lz4Decompressor lz4(programFile.buffer(), programFile.size());
200 Lz4Decompressor::Result lz4Result = lz4.initialize();
201 if (lz4Result != Lz4Decompressor::Success)
202 {
203 ERROR("failed to initialize LZ4 decompressor: result = " << (int) lz4Result);
204 return MPI_ERR_BAD_FILE;
205 }
206
207 // Allocate memory for decompressed program
208 Memory::Range uncompProgRange;
209 uncompProgRange.virt = 0;
210 uncompProgRange.phys = 0;
211 uncompProgRange.size = lz4.getUncompressedSize();
213 API::Result vmResult = VMCtl(SELF, MapContiguous, &uncompProgRange);
214 if (vmResult != API::Success)
215 {
216 ERROR("failed to allocate program buffer: result = " << (int) vmResult);
217 return MPI_ERR_NO_MEM;
218 }
219
220 programBuffer = (u8 *) uncompProgRange.virt;
221 assert(programBuffer != NULL);
222
223 // Decompress entire file
224 const Lz4Decompressor::Result decompResult = lz4.read(programBuffer, lz4.getUncompressedSize());
225 if (decompResult != Lz4Decompressor::Success)
226 {
227 ERROR("failed to decompress program buffer: result = " << (int) decompResult);
228 return MPI_ERR_NO_MEM;
229 }
230
231 // Allocate memory space for two-way communication
232 // between the master and the other processors
238 if (vmResult != API::Success)
239 {
240 ERROR("failed to allocate MemoryChannel: result = " << (int) vmResult);
241 return MPI_ERR_NO_MEM;
242 }
243
244 DEBUG("MemoryChannel at physical address " << (void *) m_memChannelBase.phys);
245
246 // Clear channel pages
248
249 // now create the slaves using coreservers.
250 for (Size i = 1; i < m_coreCount; i++)
251 {
252 String programCmd;
253
254 // Format program command with MPI specific arguments for the slaves
255 programCmd << programPath << " --slave " <<
256 Number::Hex << (void *)(m_memChannelBase.phys) << " " <<
257 Number::Dec << i << " " << m_coreCount;
258
259 // Append additional user arguments
260 for (int j = 1; j < *argc; j++)
261 {
262 programCmd << " " << (*argv)[j];
263 }
264
265 const Core::Result result = coreClient.createProcess(i, (const Address) programBuffer,
266 lz4.getUncompressedSize(), *programCmd);
267 if (result != Core::Success)
268 {
269 ERROR("failed to create process on core" << i << ": result = " << (int) result);
270 return MPI_ERR_SPAWN;
271 }
272 }
273
274 // Fill read channels
275 for (Size i = 1; i < m_coreCount; i++)
276 {
277 const Result readResult = createReadChannel(i, getMemoryBaseRead(i));
278 if (readResult != MPI_SUCCESS)
279 {
280 ERROR("failed to create read MemoryChannel for core" << i << ": result = " << (int) readResult);
281 return readResult;
282 }
283 }
284
285 // Fill write channels
286 for (Size i = 1; i < m_coreCount; i++)
287 {
288 const Result writeResult = createWriteChannel(i, getMemoryBaseWrite(i));
289 if (writeResult != MPI_SUCCESS)
290 {
291 ERROR("failed to create write MemoryChannel for core" << i << ": result = " << (int) writeResult);
292 return writeResult;
293 }
294 }
295
296 return MPI_SUCCESS;
297}
298
300 char ***argv)
301{
302 // If we are slave (node N): read arguments: --slave [memoryChannelBase] [rankId] [coreCount]
303 if ((*argc) < 5)
304 {
305 ERROR("invalid number of arguments given");
306 return MPI_ERR_ARG;
307 }
308
309 String s = (*argv)[2];
311
312 s = (*argv)[3];
314 assert(m_coreId != 0);
315
316 s = (*argv)[4];
319 assert(m_coreCount > 0);
321
322 // Pass the rest of the arguments to the user program
323 (*argc) -= 4;
324 (*argv)[4] = (*argv)[0];
325 (*argv) += 4;
326
327 // Create MemoryChannels for communication with the master
328 const Result readResult = createReadChannel(0, getMemoryBaseRead(m_coreId));
329 if (readResult != MPI_SUCCESS)
330 {
331 ERROR("failed to create read MemoryChannel for master: result = " << (int) readResult);
332 return readResult;
333 }
334
335 const Result writeResult = createWriteChannel(0, getMemoryBaseWrite(m_coreId));
336 if (writeResult != MPI_SUCCESS)
337 {
338 ERROR("failed to create write MemoryChannel for master: result = " << (int) writeResult);
339 return writeResult;
340 }
341
342 return MPI_SUCCESS;
343}
344
346 const Address memoryBase)
347{
349 if (!ch)
350 {
351 ERROR("failed to allocate consumer MemoryChannel for coreId = " << coreId);
352 return MPI_ERR_NO_MEM;
353 }
354
355 ch->setPhysical(memoryBase, memoryBase + PAGESIZE);
357
358 if (m_coreId == 0)
359 {
360 DEBUG(m_coreId << ": readChannel: core" << coreId << ": data = " << (void *) memoryBase <<
361 " feedback = " << (void *) (memoryBase + PAGESIZE));
362 }
363
364 return MPI_SUCCESS;
365}
366
368 const Address memoryBase)
369{
371 if (!ch)
372 {
373 ERROR("failed to allocate producer MemoryChannel for coreId = " << coreId);
374 return MPI_ERR_NO_MEM;
375 }
376
377 ch->setPhysical(memoryBase, memoryBase + PAGESIZE);
379
380 if (m_coreId == 0)
381 {
382 DEBUG(m_coreId << ": writeChannel: core" << coreId << ": data = " << (void *) memoryBase <<
383 " feedback = " << (void *) (memoryBase + PAGESIZE));
384 }
385
386 return MPI_SUCCESS;
387}
388
390{
392
393 const Address base = m_memChannelBase.phys + (PAGESIZE * 2 * (coreId));
394
395 if (m_coreId == 0)
396 {
397 return base;
398 }
399 else
400 {
401 return base + (PAGESIZE * 2 * m_coreCount);
402 }
403}
404
406{
408
409 const Address base = m_memChannelBase.phys + (PAGESIZE * 2 * (coreId));
410
411 if (m_coreId == 0)
412 {
413 return base + (PAGESIZE * 2 * m_coreCount);
414 }
415 else
416 {
417 return base;
418 }
419}
u8 coreId
Definition IntelACPI.h:1
Result
Enumeration of generic kernel API result codes.
Definition API.h:69
@ Success
Definition API.h:70
static T * create()
Abstract function to create an instance of T.
Provides a buffered abstract interface to a file.
const Size size() const
Get file size.
Result
Result codes.
Result read()
Read the file (buffered)
const void * buffer() const
Get file buffer.
@ Consumer
Definition Channel.h:59
@ Producer
Definition Channel.h:58
@ Success
Definition Channel.h:43
CoreClient provides a simple interface to a CoreServer.
Definition CoreClient.h:41
Core::Result createProcess(const Size coreId, const Address programAddr, const Size programSize, const char *programCmd) const
Create a new process on a different core.
Core::Result getCoreCount(Size &numCores) const
Get number of processor cores in the system.
virtual T * get(const Size position) const
Returns the item at the given position.
Definition Index.h:187
virtual bool insertAt(const Size position, T *item)
Inserts the given item at the given position.
Definition Index.h:113
Decompress data using the LZ4 algorithm created by Yann Collet.
Result initialize()
Initialize the decompressor.
u64 getUncompressedSize() const
Get size of the uncompressed data.
Result
Result codes.
Result read(void *buffer, const Size size) const
Reads compressed data.
static void * set(void *dest, int ch, unsigned count)
Fill memory with a constant byte.
static bool compare(const void *p1, const void *p2, const Size count)
Compare memory.
Unidirectional point-to-point channel using shared memory.
Result setPhysical(const Address data, const Address feedback, const bool hardReset=true)
Set memory pages by physical address.
virtual Result read(void *buffer)
Read a message.
virtual Result write(const void *buffer)
Write a message.
Represents a Message Passing Interface (MPI) implementation backend.
Definition MpiBackend.h:37
int Result
Result code.
Definition MpiBackend.h:47
Implements a Message Passing Interface (MPI) for communication between local cores.
Definition MpiTarget.h:41
Index< MemoryChannel, MaximumChannels > m_readChannels
Stores all channels for receiving data from other cores.
Definition MpiTarget.h:210
Result createReadChannel(const Size coreId, const Address memoryBase)
Create a new MPI channel for reading.
Index< MemoryChannel, MaximumChannels > m_writeChannels
Stores all channels for sending data to other cores.
Definition MpiTarget.h:213
virtual Result getCommRank(MPI_Comm comm, int *rank)
Retrieve communication rank (core id)
Definition MpiTarget.cpp:64
Result initializeSlave(int *argc, char ***argv)
Initialize a slave.
virtual Result initialize(int *argc, char ***argv)
Initialize the backend.
Definition MpiTarget.cpp:44
MpiTarget()
Constructor.
Definition MpiTarget.cpp:37
static const Size MaximumChannels
Maximum number of communication channels.
Definition MpiTarget.h:45
virtual Result terminate()
Terminate the backend.
Definition MpiTarget.cpp:59
Address getMemoryBaseRead(const Size coreId) const
Get memory address for MPI read communication.
Size m_coreCount
Total number of cores.
Definition MpiTarget.h:204
virtual Result send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
Synchronous send data.
Definition MpiTarget.cpp:78
Result initializeMaster(int *argc, char ***argv)
Initialize the master.
virtual Result getCommSize(MPI_Comm comm, int *size)
Retrieve communication size (total cores)
Definition MpiTarget.cpp:71
Memory::Range m_memChannelBase
Memory base address for local MPI communication.
Definition MpiTarget.h:207
virtual Result receive(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)
Synchronous receive data.
Result createWriteChannel(const Size coreId, const Address memoryBase)
Create a new MPI channel for writing.
Size m_coreId
Core identifier is a unique number on each core.
Definition MpiTarget.h:201
Address getMemoryBaseWrite(const Size coreId) const
Get memory address for MPI write communication.
Abstraction of strings.
Definition String.h:42
long toLong(const Number::Base base=Number::Dec) const
Convert the String to a signed long integer.
Definition String.cpp:456
#define SELF
Definition ProcessID.h:35
API::Result ProcessCtl(const ProcessID proc, const ProcessOperation op, const Address addr=0, const Address output=0)
Prototype for user applications.
Definition ProcessCtl.h:93
API::Result VMCtl(const ProcessID procID, const MemoryOperation op, Memory::Range *range=ZERO)
Prototype for user applications.
Definition VMCtl.h:61
@ MapContiguous
Definition VMCtl.h:37
@ Schedule
Definition ProcessCtl.h:52
#define PAGESIZE
ARM uses 4K pages.
Definition ARMConstant.h:97
MPI_Datatype
Named Predefined Datatypes.
Definition mpi.h:47
uint MPI_Status
Status holder.
Definition mpi.h:41
uint MPI_Comm
Communicator identifier.
Definition mpi.h:38
@ MPI_UNSIGNED_CHAR
Definition mpi.h:52
@ MPI_INT
Definition mpi.h:51
@ MPI_ERR_UNSUPPORTED_DATAREP
Definition mpi.h:128
@ MPI_ERR_BAD_FILE
Definition mpi.h:96
@ MPI_ERR_RANK
Definition mpi.h:79
@ MPI_ERR_SPAWN
Definition mpi.h:127
@ MPI_ERR_ARG
Definition mpi.h:86
@ MPI_SUCCESS
Definition mpi.h:73
@ MPI_ERR_NO_MEM
Definition mpi.h:112
#define assert(exp)
Insert program diagnostics.
Definition assert.h:60
#define NULL
NULL means zero.
Definition Macros.h:39
unsigned long Address
A memory address.
Definition Types.h:131
#define ERROR(msg)
Output an error message.
Definition Log.h:61
unsigned int Size
Any sane size indicator cannot go negative.
Definition Types.h:128
#define DEBUG(msg)
Output a debug message to standard output.
Definition Log.h:89
unsigned char u8
Unsigned 8-bit number.
Definition Types.h:59
Result
Result code for Actions.
Definition Core.h:48
@ Success
Definition Core.h:49
@ User
Definition Memory.h:44
@ Readable
Definition Memory.h:41
@ Writable
Definition Memory.h:42
@ Hex
Decimal: 0-10.
Definition Types.h:171
@ Dec
Definition Types.h:170
Memory range.
Definition Memory.h:56
Size size
Size in number of bytes.
Definition Memory.h:59
Address phys
Physical address.
Definition Memory.h:58
Address virt
Virtual address.
Definition Memory.h:57
Access access
Page access flags.
Definition Memory.h:60