This is sr_machipc.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * sr_machipc.c - Send/Receive routines built on Mach IPC * * #include "sr_doc.h" * See sr_doc.h for detailed documentation on Send/Receive modules. * * This sr module has not been kept up to date... */ #include "pcn.h" #include <mach.h> #include <servers/netname.h> #define MSG_BACKLOG PORT_BACKLOG_MAX /* Size of message backlog queue */ #define TIMEOUT 60000 /* Timeout on receives during startup (ms) */ #define SEND_TIMEOUT_VAL 5000 /* Timeout on receives during startup (ms) */ typedef struct msg_buf_header_struct { struct msg_buf_header_struct * next; int_t size; int_t type; int_t node; cell_t user_buf[1]; } msg_buf_header_t; #define HEADER_SIZE 4 #define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE) #define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \ - HEADER_SIZE)) typedef struct { msg_header_t h; msg_type_long_t t; msg_buf_header_t * buf; } pcn_msg_t; typedef struct { msg_header_t h; } m0_t; typedef struct { msg_header_t h; msg_type_t t; int i1; } m1_t; typedef struct { msg_header_t h; msg_type_t t; int i1; int i2; } m2_t; /* MSG_TYPE_NORMAL is defined in <sys/message.h> */ #define MSG_TYPE_ABORT 2 static port_t worker_port[MAX_NODES]; static port_t my_port; static bool_t aborted = FALSE; static msg_buf_header_t *mq_front, *mq_back; static char parallel_startup[MAX_PATH_LENGTH]; static char parallel_info[MAX_PATH_LENGTH]; static bool_t dont_start_remotes; /* * void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc) */ void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp) int argc; char **argv; argdesc_t **argdescp; int *n_argdescp; { static argdesc_t argdesc[] = { INTEGER_ARG("n", &_p_nodes, "number of nodes"), STRING_ARG("s", parallel_startup, "system specific parallel startup file"), STRING_ARG("pi", parallel_info, "parallel info for node processes"), BOOL_ARG("nostart", &dont_start_remotes, "don't actually start remote nodes"), }; parallel_startup[0] = '\0'; parallel_info[0] = '\0'; dont_start_remotes = FALSE; *argdescp = argdesc; *n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]); } /* _p_sr_get_argdesc() */ /* * void sr_fatal_error(char *msg) */ static void sr_fatal_error(msg) char *msg; { if (_p_my_id == -1) /* Node doesn't have an id yet */ { fprintf(stderr, "Fatal error: Node ???: Failed to create workers: %s\n", msg); } else { fprintf(stderr, "Fatal error: Node %d: Failed to create workers: %s\n", _p_my_id, msg); } exit(1); } /* sr_fatal_error() */ /* * void setup_host_port(netname_name_t *netname) * * Allocate an unrestricted port that the workers can use to check into, and * set the global variable, 'my_port', to this port. * Then, register the host's port with the net name server, so that the * workers can find the port. */ static void setup_host_port(netname) netname_name_t netname; { extern int getpid(); _p_host = TRUE; /* Allocate a port for the host */ if (port_allocate(task_self(), &my_port) != KERN_SUCCESS) sr_fatal_error("Failed port_allocate()"); port_set_backlog(task_self(), my_port, MSG_BACKLOG); worker_port[0] = my_port; /* Register this host port with the net message server */ sprintf(netname, "pcn%d", getpid()); /* port_unrestrict(task_self(), &my_port); */ if (netname_check_in(name_server_port, netname, PORT_NULL, my_port) != NETNAME_SUCCESS) sr_fatal_error("Failed netname_check_in()"); } /* setup_host_port() */ /* * int fork_nodes(int n) * * Fork off n worker nodes. */ static int fork_nodes(n) int n; { int pid = 1; for (; n > 0; n--) { #ifdef DEBUG if (ParDebug(9)) printf("Forking off a worker node\n"); #endif DEBUG if ((pid = fork()) < 0) /* error */ { sr_fatal_error("Failed fork()"); } else if (pid == 0) /* child */ { _p_host = FALSE; break; } } return (pid); } /* fork_nodes() */ /* * void _p_sr_init_node() * * Do the parallel initialization, and call _p_init_node() before returning. * * parallel_startup string comes from the command line -s option. This * is what the user would set to specify a startup file (what machines to * start workers up on). * * The parallel_info string comes from the command line -pi option. This * is used by the host to pass arguments to the workers. The user should * never set this option. * * If both parallel_startup and parallel_info are empty strings, * then we know we are the host and that we should just fork off * _p_nodes-1 workers. * * Error handling: One cool thing about this function is that if we fail * to create any worker, the whole world will shut itself down automatically * in TIMEOUT seconds, since that is longest a worker will wait for a receive. * In other words, much of the icky error handling that deals with the * creation of workers is handled by default! * * Note: _p_sr_init_node() should not use _p_fatal_error(). That function * relies on _p_stdout being set, which doesn't happen until after * _p_sr_init_node() is done. Instead it should use its own function, * sr_fatal_error() to handle fatal errors when creating the workers. */ void _p_sr_init_node() { char buf1[MAX_PATH_LENGTH], buf2[MAX_PATH_LENGTH], buf3[512], *b1, *b2; netname_name_t netname; char host_name[256]; port_t host_port; int i, j; FILE *fp; bool_t found; m0_t m0; m1_t m1; m2_t m2; /* Return if this is not a multiprocessing run */ if (_p_nodes == 1 && parallel_startup[0] == '\0' && parallel_info[0] == '\0') { _p_my_id = _p_host_id = 0; _p_nodes = 1; _p_host = TRUE; _p_usehost = TRUE; /* Finish node initialization and run emulator */ _p_init_node(); return; } if(_p_nodes > MAX_NODES) { char buf[256]; sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES); sr_fatal_error(buf); } /* Get the host's host name */ if (gethostname(host_name, sizeof(host_name)) == -1) sr_fatal_error("Failed to get host's machine name"); if (parallel_startup[0] == '\0' && parallel_info[0] == '\0') { /* * This is the host. It should fork off _p_nodes-1 workers. */ setup_host_port(&netname); fork_nodes(_p_nodes-1); } else if (parallel_startup[0] != '\0') { /* * This is the host. It should read the startup file (specified * by the string in parallel_startup, and use that information to * system() off workers. * * In the startup file, each line represents one startup command, * where a startup command is either: * fork <n> - fork off this may workers locally * exec <n>:<command> $ARGS$ ... - a command to execute a pcn worker * * The exec form is for issuing a command (such as rsh) to start * up workers (often on another machine): * The <n> argument specifies how many workers will be forked * by that execution. * The $ARGS$ of the command will be replaced with the -pi * argument which contains the host's host and the host * port's netname, and the -n argument with specifies how * many workers that pcn should fork off. * The <command> argument is the command that should be used * to execute the worker. It can use all flags except * -h, -s, -pi and -n when starting up the node. The $ARGS$ * string should be embedded somewhere in this command. * * The command should NOT use the -s argument!!! */ setup_host_port(&netname); if ((fp = fopen(parallel_startup, "r")) == (FILE *) NULL) sr_fatal_error("Unable to open parallel startup file"); _p_nodes = 1; /* Count the # of nodes that we start up */ /* Read in parallel startup file, one line at a time */ for (i = 1; fgets(buf1, MAX_PATH_LENGTH - 1, fp) != (char *) NULL; i++) { if (buf1[0] == '%' || buf1[0] == '\n' || buf1[0] == ' ' || buf1[0] == '\t' || buf1[0] == '#') { continue; } else if (strncmp(buf1, "fork", 4) == 0) { /* Fork off the specified number of nodes */ if (sscanf(buf1+4, "%d", &j) != 1 || j < 1) { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } _p_nodes += j; if(_p_nodes > MAX_NODES) { char buf[256]; sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES); sr_fatal_error(buf); } if (fork_nodes(j) == 0) /* child */ break; } else if (strncmp(buf1, "exec", 4) == 0) /* use system() */ { j = strlen(buf1) - 1; if (buf1[j] == '\n') buf1[j] = '\0'; /* Get the number of nodes to fork */ if (sscanf(buf1+4, "%d", &j) != 1 || j < 1) { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } _p_nodes += j; if(_p_nodes > MAX_NODES) { char buf[256]; sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES); sr_fatal_error(buf); } /* Find the start of the command */ if ((b1 = strchr(buf1, ':')) == (char *) NULL) { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } /* Construct the $ARGS$ argument */ sprintf(buf3, " -n %d -pi %s,%s ", j, host_name, netname); /* Substitute $ARGS$ in the command with appropriate stuff */ for (++b1, b2 = buf2, found = FALSE; *b1 != '\0'; ) { if (strncmp(b1, "$ARGS$", 6) == 0) { *b2 = '\0'; strcat(buf2, buf3); strcat(buf2, b1+6); found = TRUE; break; } else *b2++ = *b1++; } if (!found) { sprintf(buf1, "Bad startup command, line %d -- no $ARGS$", i); sr_fatal_error(buf1); } strcat(buf2, " &"); /* Now system() it off */ #ifdef DEBUG if (ParDebug(9)) printf("Execing worker with: %s\n", buf2); #endif /* DEBUG */ if (!dont_start_remotes) if (system(buf2) != 0) sr_fatal_error("Failed system()"); } else { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } } fclose(fp); } else /* parallel_info[0] != '\0' */ { /* * This is a worker that was started up by a system() call. * * parallel_info contains the -pi argument, which contains the * host_name and the netname * _p_nodes contains the number of nodes to fork off locally */ if ((b1 = strchr(parallel_info, ',')) == (char *) NULL) sr_fatal_error("Failed to parse -s argument"); *b1 = '\0'; strcpy(host_name, parallel_info); strcpy(netname, b1+1); fork_nodes(_p_nodes - 1); } /* * Each worker will check into the host through the * host's registed port. * - In the checkin message will contain the worker's port. * - The host will return a message to the worker with his id * and the number of nodes. */ if (_p_host) { _p_my_id = _p_host_id = _p_nodes - 1; worker_port[_p_host_id] = my_port; #ifdef DEBUG if (ParDebug(9)) printf("Host: Waiting for workers to check in\n"); #endif DEBUG for (i = 0; i < _p_nodes-1; i++) { m0.h.msg_size = sizeof(m0); m0.h.msg_local_port = my_port; if (msg_receive((msg_header_t *) &m0, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS) sr_fatal_error("Some workers did not check in"); #ifdef DEBUG if (ParDebug(9)) printf("Host: received port %d for node %d\n", m0.h.msg_remote_port, i); #endif DEBUG worker_port[i] = m0.h.msg_remote_port; /* * Now send a message back to worker with that node's id, * and the total number of nodes. */ m2.h.msg_simple = TRUE; m2.h.msg_size = sizeof(m2); m2.h.msg_type = MSG_TYPE_NORMAL; m2.h.msg_local_port = PORT_NULL; m2.h.msg_remote_port = worker_port[i]; m2.t.msg_type_name = MSG_TYPE_INTEGER_32; m2.t.msg_type_size = 32; m2.t.msg_type_number = 2; m2.t.msg_type_inline = TRUE; m2.t.msg_type_longform = FALSE; m2.t.msg_type_deallocate = FALSE; m2.i1 = i; m2.i2 = _p_nodes; if (msg_send((msg_header_t *) &m2, MSG_OPTION_NONE, 0) != SEND_SUCCESS) sr_fatal_error("Failed to send id and # of nodes to worker"); } } else /* worker */ { /* Allocate this worker's port */ if (port_allocate(task_self(), &my_port) != KERN_SUCCESS) sr_fatal_error("Failed port_allocate() on worker"); port_set_backlog(task_self(), my_port, MSG_BACKLOG); #ifdef DEBUG if (ParDebug(9)) printf("Node ???: Allocated port %d\n", my_port); #endif DEBUG /* Lookup the host's port on the local host */ if (netname_look_up(name_server_port, host_name, netname, &host_port) != NETNAME_SUCCESS) sr_fatal_error("Failed worker (failed netname_look_up())"); #ifdef DEBUG if (ParDebug(9)) printf("Node ???: Got host's port %d\n", host_port); #endif DEBUG /* Send a message to the host with our port */ m0.h.msg_simple = TRUE; m0.h.msg_size = sizeof(m0); m0.h.msg_type = MSG_TYPE_NORMAL; m0.h.msg_local_port = my_port; m0.h.msg_remote_port = host_port; if (msg_send((msg_header_t *) &m0, MSG_OPTION_NONE, 0) != SEND_SUCCESS) sr_fatal_error("Failed to send port to host"); #ifdef DEBUG if (ParDebug(9)) printf("Node ???: Sent port to host\n"); #endif DEBUG /* Receive message from host with id and # of nodes */ m2.h.msg_size = sizeof(m2); m2.h.msg_local_port = my_port; if (msg_receive((msg_header_t *) &m2, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS) sr_fatal_error("Failed to get id and # of nodes from host"); _p_my_id = m2.i1; _p_nodes = m2.i2; _p_host_id = _p_nodes - 1; worker_port[_p_my_id] = my_port; worker_port[_p_host_id] = host_port; #ifdef DEBUG if (ParDebug(9)) printf("(%d) Received id from host -- %d nodes\n", _p_my_id, _p_nodes); #endif DEBUG } if (_p_host) netname_check_out(name_server_port, netname, PORT_NULL); /* * Now the host has the ports for every worker. So distribute all * of the ports to all of the works. (A worker already knows his port * that of the host, so there is no need to send them.) */ if (_p_host) { for (j = 0; j < _p_nodes-1; j++) { for (i = 0; i < _p_nodes-1; i++) { if (i != j) { /* Send a message to worker i with port for worker j */ m1.h.msg_simple = TRUE; m1.h.msg_size = sizeof(m1); m1.h.msg_type = MSG_TYPE_NORMAL; m1.h.msg_local_port = worker_port[j]; m1.h.msg_remote_port = worker_port[i]; m1.t.msg_type_name = MSG_TYPE_INTEGER_32; m1.t.msg_type_size = 32; m1.t.msg_type_number = 1; m1.t.msg_type_inline = TRUE; m1.t.msg_type_longform = FALSE; m1.t.msg_type_deallocate = FALSE; m1.i1 = j; #ifdef DEBUG if (ParDebug(9)) printf("Host: Sending port for node %d to node %d\n", j, i); #endif DEBUG if (msg_send((msg_header_t *) &m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS) { sprintf(buf1, "Failed to send port for node %d to node %d", j, i); sr_fatal_error(buf1); } #ifdef DEBUG if (ParDebug(9)) printf("Host: Sent port for node %d to node %d\n", j, i); #endif DEBUG } } } } else /* worker */ { for (i = 0; i < _p_nodes-2; i++) { /* Wait for _p_nodes-2 messages with port to other nodes */ m1.h.msg_size = sizeof(m1); m1.h.msg_local_port = my_port; if (msg_receive((msg_header_t *) &m1, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS) sr_fatal_error("Failed to receive ports to other nodes"); #ifdef DEBUG if (ParDebug(9)) printf("(%d) Received port %d for node %d\n", _p_my_id, m1.h.msg_remote_port, m1.i1); #endif DEBUG worker_port[m1.i1] = m1.h.msg_remote_port; } } /* * Do any other send/receive initialization stuff here... */ _p_default_msg_buffer_size = vm_page_size / CELL_SIZE - HEADER_SIZE; mq_front = mq_back = (msg_buf_header_t *) NULL; _p_usehost = TRUE; /* Finish node initialization and run emulator */ _p_init_node(); } /* _p_sr_init_node() */ /* * void _p_sr_node_initialized() */ void _p_sr_node_initialized() { char buf[256]; m1_t m1; int i; if (!_p_multiprocessing) return; /* * Final checkin. If one of the * nodes fails to initialize (and dies), then this final checkin will * cause all of the other workers to shut down automaticly, due * to the timeout on the receives. */ if (_p_host) { /* * Host: Wait for each worker to checkin, then send message to * each worker. */ for (i = 0; i < _p_nodes-1; i++) { m1.h.msg_size = sizeof(m1); m1.h.msg_local_port = my_port; if (msg_receive((msg_header_t *) &m1, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS) _p_fatal_error("A node did not checkin after initialization"); #ifdef DEBUG if (ParDebug(9)) printf("Host: Final checkin received from node %d\n", m1.i1); #endif DEBUG } m1.h.msg_simple = TRUE; m1.h.msg_size = sizeof(m1); m1.h.msg_type = MSG_TYPE_NORMAL; m1.h.msg_local_port = PORT_NULL; m1.t.msg_type_name = MSG_TYPE_INTEGER_32; m1.t.msg_type_size = 32; m1.t.msg_type_number = 1; m1.t.msg_type_inline = TRUE; m1.t.msg_type_longform = FALSE; m1.t.msg_type_deallocate = FALSE; m1.i1 = 0; for (i = 0; i < _p_nodes-1; i++) { /* Send a message to worker i */ m1.h.msg_remote_port = worker_port[i]; if (msg_send((msg_header_t *) &m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS) { sprintf(buf, "Failed to send final initialization to node %d", i); _p_fatal_error(buf); } #ifdef DEBUG if (ParDebug(9)) printf("Host: Final checkin sent to node %d\n", i); #endif DEBUG } } else { /* Worker: Send message to host, then receive message */ m1.h.msg_simple = TRUE; m1.h.msg_size = sizeof(m1); m1.h.msg_type = MSG_TYPE_NORMAL; m1.h.msg_local_port = PORT_NULL; m1.h.msg_remote_port = worker_port[_p_host_id]; m1.t.msg_type_name = MSG_TYPE_INTEGER_32; m1.t.msg_type_size = 32; m1.t.msg_type_number = 1; m1.t.msg_type_inline = TRUE; m1.t.msg_type_longform = FALSE; m1.t.msg_type_deallocate = FALSE; m1.i1 = _p_my_id; if (msg_send((msg_header_t *) &m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS) _p_fatal_error("Failed to send final checkin to host"); #ifdef DEBUG if (ParDebug(9)) printf("(%d) Sent final checkin\n", _p_my_id); #endif DEBUG m1.h.msg_local_port = my_port; if (msg_receive((msg_header_t *) &m1, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS) _p_fatal_error("Failed to receive final checkin"); #ifdef DEBUG if (ParDebug(9)) printf("(%d) Received final checkin\n", _p_my_id); #endif DEBUG } #ifdef DONT_INCLUDE /* The following is test stuff */ sleep(1); printf("\n"); for (i = 0; i < _p_nodes; i++) { printf ("(%d)%s: Summary: node %d has port %d\n", _p_my_id, (_p_host ? " (host)" : ""), i, worker_port[i]); } sleep(1); /* Test by sending something around the ring */ if (_p_host) { printf("\nHost: Sending test\n"); i = (_p_my_id+1) % _p_nodes; m1.h.msg_simple = TRUE; m1.h.msg_size = sizeof(m1); m1.h.msg_type = MSG_TYPE_NORMAL; m1.h.msg_local_port = PORT_NULL; m1.h.msg_remote_port = worker_port[i]; m1.t.msg_type_name = MSG_TYPE_INTEGER_32; m1.t.msg_type_size = 32; m1.t.msg_type_number = 1; m1.t.msg_type_inline = TRUE; m1.t.msg_type_longform = FALSE; m1.t.msg_type_deallocate = FALSE; m1.i1 = 42; printf("Host: Sending test to node %d on port %d\n", i, m1.h.msg_remote_port); if (msg_send(&m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS) _p_fatal_error("Failed to send test to node 0."); printf("Host: Sent test to node %d\n", i); m1.h.msg_size = sizeof(m1); m1.h.msg_local_port = my_port; if (msg_receive(&m1, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS) _p_fatal_error("Host failed to receive test."); printf("Host: Received test (%d)\n", m1.i1); } else { m1.h.msg_size = sizeof(m1); m1.h.msg_local_port = my_port; if (msg_receive(&m1, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS) _p_fatal_error("Worker failed to receive test."); printf("(%d) Received test (%d)\n", _p_my_id, m1.i1); i = (_p_my_id+1) % _p_nodes; m1.h.msg_local_port = PORT_NULL; m1.h.msg_remote_port = worker_port[i]; printf("(%d) Sending test to node %d on port %d\n", _p_my_id, i, m1.h.msg_remote_port); if (msg_send(&m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS) _p_fatal_error("Failed to send test to next node"); printf("(%d) Sent test to node %d\n", _p_my_id, i); } exit(1); #endif DONT_INCLUDE } /* _p_sr_node_initialized() */ /* * void _p_destroy_nodes() */ void _p_destroy_nodes() { } /* * void _p_abort_nodes() */ void _p_abort_nodes() { int i; pcn_msg_t msg; /* Deallocate my port so that sends to it will fail */ port_deallocate(task_self(), my_port); if (aborted) /* Someone else initiated the abort */ return; for (i = 0; i < _p_nodes; i++) { if (i != _p_my_id) { msg.h.msg_simple = FALSE; msg.h.msg_size = sizeof(pcn_msg_t); msg.h.msg_type = MSG_TYPE_ABORT; msg.h.msg_local_port = PORT_NULL; msg.h.msg_remote_port = worker_port[i]; msg.t.msg_type_long_name = MSG_TYPE_UNSTRUCTURED; msg.t.msg_type_long_size = 32; msg.t.msg_type_long_number = 0; msg.t.msg_type_header.msg_type_inline = FALSE; msg.t.msg_type_header.msg_type_longform = TRUE; msg.t.msg_type_header.msg_type_deallocate = FALSE; msg.buf = (msg_buf_header_t *) NULL; msg_send((msg_header_t *) &msg, SEND_TIMEOUT, 5000); } } } /* _p_abort_nodes() */ /* * cell_t *_p_alloc_msg_buffer(int size) */ cell_t *_p_alloc_msg_buffer(size) int size; /* In cells */ { int i; msg_buf_header_t *msg_buf; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%d,%d) _p_alloc_msg_buffer: Allocating %d cells\n", _p_my_id, _p_reduction, size); fflush(_p_stdout); } #endif /* DEBUG */ i = MsgSize2Bytes(size); if (vm_allocate(task_self(), (vm_address_t *) &msg_buf, i, TRUE) != KERN_SUCCESS) { _p_fatal_error("Failed to allocate a message buffer"); } msg_buf->size = size; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%d,%d) _p_alloc_msg_buffer: Allocated %d cells\n", _p_my_id, _p_reduction, size); fflush(_p_stdout); } #endif /* DEBUG */ return (msg_buf->user_buf); } /* _p_alloc_msg_buffer() */ /* * FreeMsgBuf(Buf, Size) * * Buf points to the beginning of the message buffer (not the user's buffer, * but the first message header). * Size is the size of the user portion of the buffer, in Cells. */ #define FreeMsgBuf(Buf, Size) \ { \ if (vm_deallocate(task_self(), (vm_address_t) Buf, MsgSize2Bytes(Size)) \ != KERN_SUCCESS) \ { \ _p_fatal_error("Failed to free a message buffer"); \ } \ } /* FreeMsgBuf() */ /* * void _p_msg_send(cell_t *buf, int node, int size, int type) */ void _p_msg_send(buf, node, size, type) cell_t *buf; int node; int size; /* in cells */ int type; { msg_buf_header_t *send_buf, *rcv_buf; char error_buf[256]; int_t allocated_size; pcn_msg_t msg, rcv_msg; msg_return_t rc; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%d,%d) _p_msg_send: Sending to n %d, s %d, t %d\n", _p_my_id, _p_reduction, node, size, type); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef DEBUG if (node == _p_my_id) _p_fatal_error("Internal error: Cannot send message to myself!"); #endif /* DEBUG */ /* * If buf == NULL, then we have an empty message. So just allocate * enough space for the message headers. */ if (buf == (cell_t *) NULL) { if (vm_allocate(task_self(), (vm_address_t *) &send_buf, MsgSize2Bytes(0), TRUE) != KERN_SUCCESS) { _p_fatal_error("Failed to allocate a message buffer"); } allocated_size = 0; size = 0; } else { send_buf = User2MsgBuf(buf); allocated_size = send_buf->size; } send_buf->size = size; send_buf->type = type; send_buf->node = _p_my_id; msg.h.msg_simple = FALSE; msg.h.msg_size = sizeof(pcn_msg_t); msg.h.msg_type = MSG_TYPE_NORMAL; msg.h.msg_local_port = PORT_NULL; msg.h.msg_remote_port = worker_port[node]; msg.t.msg_type_long_name = MSG_TYPE_UNSTRUCTURED; msg.t.msg_type_long_size = 32; msg.t.msg_type_long_number = size + HEADER_SIZE; msg.t.msg_type_header.msg_type_inline = FALSE; msg.t.msg_type_header.msg_type_longform = TRUE; msg.t.msg_type_header.msg_type_deallocate = FALSE; msg.buf = send_buf; while(1) { while(1) { rcv_msg.h.msg_size = sizeof(pcn_msg_t); rcv_msg.h.msg_local_port = my_port; if ((rc = msg_receive((msg_header_t *) &rcv_msg, RCV_TIMEOUT, 0)) == RCV_SUCCESS) { rcv_buf = rcv_msg.buf; if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = rcv_buf; mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } } else if (rc == RCV_TIMED_OUT) break; else if (rc != RCV_SUCCESS) _p_fatal_error("Failed on non-blocking receive"); } if ((rc = msg_send((msg_header_t *) &msg, SEND_TIMEOUT, SEND_TIMEOUT_VAL)) == SEND_SUCCESS) { break; } else if (rc == SEND_TIMED_OUT) { fprintf(_p_stdout, "(%d,%d) Warning: _p_msg_send() timed out -- retrying\n", _p_my_id, _p_reduction); fflush(_p_stdout); } else { sprintf(error_buf, "Failed to send message -- return from msg_send(): %d", rc); _p_fatal_error(error_buf); } } FreeMsgBuf(send_buf, allocated_size); #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%d,%d) _p_msg_send: Sent\n", _p_my_id, _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* _p_send_message() */ /* * bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type) */ bool_t _p_msg_receive(node, size, type, rcv_type) int *node; int *size; int *type; int rcv_type; { msg_buf_header_t *rcv_buf; pcn_msg_t msg; msg_return_t rc; bool_t done = FALSE; #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%d,%d) _p_msg_receive: Receiving message, type %d\n", _p_my_id, _p_reduction, rcv_type); fflush(_p_stdout); } #endif /* DEBUG */ /* Take message off queue if appropriate */ if (mq_front != (msg_buf_header_t *) NULL && (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK)) { rcv_buf = mq_front; mq_front = mq_front->next; } /* else, get next message from port */ else { msg.h.msg_size = sizeof(pcn_msg_t); msg.h.msg_local_port = my_port; switch (rcv_type) { case RCV_NOBLOCK: if ((rc = msg_receive((msg_header_t *) &msg, RCV_TIMEOUT, 0)) == RCV_TIMED_OUT) { #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%d,%d) _p_msg_receive: Non-blocking return\n", _p_my_id, _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ return (FALSE); } else if (rc != RCV_SUCCESS) _p_fatal_error("Failed on non-blocking receive"); rcv_buf = msg.buf; break; case RCV_BLOCK: if (msg_receive((msg_header_t *) &msg, MSG_OPTION_NONE, 0) != RCV_SUCCESS) _p_fatal_error("Failed on blocking receive"); rcv_buf = msg.buf; break; case RCV_PARAMS: while (!done) { if (msg_receive((msg_header_t *) &msg, MSG_OPTION_NONE, 0) != RCV_SUCCESS) _p_fatal_error("Failed on blocking receive"); if (msg.h.msg_type == MSG_TYPE_ABORT) done = TRUE; else { rcv_buf = msg.buf; if ( rcv_buf->type == MSG_PARAMS || rcv_buf->type == MSG_EXIT ) { done = TRUE; } else { if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = rcv_buf; mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } } } } break; case RCV_COLLECT: while (!done) { if (msg_receive((msg_header_t *) &msg, MSG_OPTION_NONE, 0) != RCV_SUCCESS) _p_fatal_error("Failed on blocking receive"); if (msg.h.msg_type == MSG_TYPE_ABORT) done = TRUE; else { rcv_buf = msg.buf; switch (rcv_buf->type) { case MSG_READ: case MSG_CANCEL: case MSG_EXIT: done = TRUE; break; case MSG_COLLECT: FreeMsgBuf(rcv_buf, rcv_buf->size); break; case MSG_DEFINE: case MSG_VALUE: if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = rcv_buf; mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } break; default: _p_fatal_error("_p_msg_receive(): Illegal message type"); break; } } } break; } if (msg.h.msg_type == MSG_TYPE_ABORT) { aborted = TRUE; _p_fatal_error("Received an abort message"); } } *size = rcv_buf->size; *node = rcv_buf->node; *type = rcv_buf->type; /* * Now copy the message onto the heap. If there is not enough room * for it, then garbage collect. * Assume that there will always be enough room for MSG_CANCEL message. */ if (*type != MSG_CANCEL) { TryGCWithSize(*size); } #ifdef PCN_ALIGN_DOUBLES if (DoubleAligned(_p_heap_ptr)) _p_heap_ptr++; #endif /* PCN_ALIGN_DOUBLES */ memcpy(_p_heap_ptr, (char *) rcv_buf->user_buf, *size * CELL_SIZE); FreeMsgBuf(rcv_buf, *size); #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%d,%d) _p_msg_receive: Received from n %d, s %d, t %d\n", _p_my_id, _p_reduction, *node, *size, *type); fflush(_p_stdout); } #endif /* DEBUG */ return (TRUE); } /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.