This is sr_bsdipc.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster and Robert Olson * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * sr_bsdipc.c - send/receive header using BSD Unix IPC (TCP sockets) */ #include <sys/types.h> #include <netinet/in.h> #include <sys/time.h> #include <sys/socket.h> #include <netdb.h> #include <sys/wait.h> #include <netinet/tcp.h> #include <sys/resource.h> #include <fcntl.h> #include <errno.h> #include <signal.h> #include <sys/param.h> #ifdef rs6000 #include <sys/select.h> #endif #include "pcn.h" /* * We need to determine if the Unix we're on uses "struct direct" or * "struct dirent" for directory entries. So far, the NeXT is the only * one that uses "struct direct". */ #if defined(next_os) || defined(symmetry) #define USE_DIRECT #else #define USE_DIRENT #endif #if defined(USE_DIRECT) #include <sys/dir.h> #else #if defined(USE_DIRENT) #include <dirent.h> #endif #endif #include <sys/file.h> /* * For the access() arguments */ #if !(defined(sequent) || defined(next_os)) #include <unistd.h> #endif /* * The OPTIMAL_TCP_BLOCKSIZE is the block size that I've found * produces the highest throughput (probably not incidentally, this is * the number of bytes written at each write() to a nonblocking * socket). * * TCP (on sparcstations at least) seems to have a problem with * sending block sizes in range from 4097 to about 6000 bytes. * Defining TCP_BUMP_HACK (named from the bump in the plot of * elapsed time for send versus message size) forces messages in this * region to be forced to a larger size, actually increasing the * performance. The endpoints of the bump are defined as TCP_BUMP_MIN * and TCP_BUMP_MAX, defaulting to OPTIMAL_TCP_BLOCKSIZE and * OPTIMAL_TCP_BLOCKSIZE + 2048. * * These parameters will probably have to be tuned for different * TCP/IP implementations. * */ #ifndef OPTIMAL_TCP_BLOCKSIZE #define OPTIMAL_TCP_BLOCKSIZE 4096 #endif #ifdef TCP_BUMP_HACK #ifndef TCP_BUMP_MIN #define TCP_BUMP_MIN OPTIMAL_TCP_BLOCKSIZE #endif #ifndef TCP_BUMP_MAX #define TCP_BUMP_MAX (TCP_BUMP_MIN + 2048) #endif #endif #ifdef hpux /* Empirically determined... */ #define NUM_FDS 60 #else /* This works for BSD machines */ #define NUM_FDS getdtablesize() #endif #define SYSTEM_ERROR (sys_errlist[errno]) #define FATAL_SYSTEM_ERROR(s) { char buf[1024]; \ sprintf(buf, s, sys_errlist[errno]); \ _p_fatal_error(buf); } #define ME _p_my_id extern int errno; extern char *sys_errlist[]; extern char *getenv(); /* * Process table: * Contains host and port number (used during creation) for each node * as well as the file descriptor for the connection to that node. * * The fd for the entry for the current process is undefined. * * buf is a pointer to the current incomplete message. * len is the number of bytes left to read. */ typedef struct _proc_table_entry { int port; int fd; char host[256]; char *buf, *bufend; int len; } proc_table_entry; static proc_table_entry proc_table[MAX_NODES]; /* * Argument parsing stuff. * * The variables are labeled _p even though they are static because I * didn't want to change them all... * * parallel_startup: * name of startup file * * parallel_info: * string of the form "host,port" which denotes the hostname of * the PCN host node, and the port on which it is listening. * * dont_start_remotes: * if true, do not actually execute the system() to start remote * nodes specfied in the startup file. * * network_debugging: * if true, print out copious debugging message for the low level * send and receive. * * use_start_nodes: * attempt to connect to the host-control daemon to start worker nodes. * * hostconfig_file: * name of a file containing host-control configuration * information. See the host-control documentation for more details. */ static char parallel_startup[MAX_PATH_LENGTH]; static char parallel_info[MAX_PATH_LENGTH]; static bool_t dont_start_remotes = 0; #ifdef DEBUG static bool_t network_debugging = 0; #endif static bool_t use_start_nodes = 0; static char hostconfig_file[MAX_PATH_LENGTH]; static bool_t start_heterogeneous; static char host_startup_list[1000]; static char *exe_from_argv; /* For saving argv[0] to give to host-control */ /* * my_port is the TCP port number for the port on which the host * listens for connections from the worker nodes. * * listen_fd is the associated file descriptor. */ static int my_port; static int listen_fd; static bool_t destroying_nodes = FALSE; typedef struct msg_buf_header_struct { struct msg_buf_header_struct * next; #ifdef TCP_BUMP_HACK int_t alloc_size; #endif int_t size; int_t type; int_t node; cell_t user_buf[1]; } msg_buf_header_t; #ifdef TCP_BUMP_HACK #define HEADER_SIZE 5 #else #define HEADER_SIZE 4 #endif #define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE) #define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \ - HEADER_SIZE)) #define MsgBuf2User(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \ + HEADER_SIZE)) static msg_buf_header_t *mq_front, *mq_back; #ifdef STREAMS /* * Each element of the stream_table contains a pointer to the front * of a stream array on the heap, and the index of this table entry * into that stream array. * * The sindex field of the stream_table is also used to maintain a * free list of stream_table entries, where first_free_stream_id * contains the index of the first free stream table entry. * * When there is a pending receive on a stream, the 'enabled' field is * set to TRUE. Whenever a stream message arrives, _p_msg_receive() * needs to check this field to see if it can deliver the message. * If enabled==FALSE, then the message is queued on this stream's * message queue (mq_front, mq_back). * * A receiving side of a stream is marked closed when: * - closed==TRUE * - and, the message queue is empty */ static struct { cell_t * stream_array; int_t sindex; bool_t enabled; bool_t closed; msg_buf_header_t * mq_front; msg_buf_header_t * mq_back; } stream_table[MAX_STREAMS]; static int_t max_streams = MAX_STREAMS; static int_t first_free_stream_id; static void handle_close_stream(); static void handle_stream_receive(); static void deliver_stream_msg(); #endif /* STREAMS */ /* * A worker_ctl_msg_t is sent between the host and workers during * system initialization. * * Message types: * * WORKER_CTL_INIT: * i1 = node id * i2 = number of nodes * Sent from host to worker * * WORKER_CTL_PORT * i1 = port number on which worker is listening * buf = hostname of worker * Sent from worker to host * * WORKER_CTL_PROCTBL_ENTRY * A process table entry. * i1 = node number * i2 = port on which node i1 is listening * buf = hostname for node i1 * * WORKER_CTL_PROCTBL_END * Denotes the end of the process table. * * WORKER_CTL_START_CONNECTING * Sent from the host to a worker to instruct the worker to * connect to all other workers with node numbers greater than * its own. * * WORKER_CTL_CONNECTIONS_MADE * Sent from a worker to the host when it has finished making its * connections. * * WORKER_CTL_CONN_TEST * This is the first message sent on a new connection. It is used * to test the connection. * * WORKER_CTL_CONN_ACK * Response to a WORKER_CTL_CONN_TEST message. * * WORKER_CTL_GO * Sent from the host to a worker to instruct the worker to begin * executing PCN. * * WORKER_CTL_GO_ACK * Response to a WORKER_CTL_GO message. * * WORKER_CTL_CHECKIN * Sent from the host to a worker during the final node checking * in _p_sr_node_initialized(). * * WORKER_CTL_CHECKIN_ACK * Response to a WORKER_CTL_CHECKIN message. * * WORKER_CTL_EXEC_FAILED * Sent by host-control when it discovers that a node couldn't be * started. */ typedef struct worker_ctl_msg_struct { int type; int i1, i2, i3; char buf[256]; } worker_ctl_msg_t; #define WORKER_CTL_INIT 1 #define WORKER_CTL_PORT 2 #define WORKER_CTL_PROCTBL_ENTRY 3 #define WORKER_CTL_PROCTBL_END 4 #define WORKER_CTL_START_CONNECTING 5 #define WORKER_CTL_CONNECTIONS_MADE 6 #define WORKER_CTL_CONN_TEST 7 #define WORKER_CTL_CONN_ACK 8 #define WORKER_CTL_GO 9 #define WORKER_CTL_GO_ACK 10 #define WORKER_CTL_CHECKIN 11 #define WORKER_CTL_CHECKIN_ACK 12 #define WORKER_CTL_EXEC_FAILED 13 static void net_setup_anon_listener(); static int net_accept() ; static int net_conn_to_listener(); static void error_check(); static int blocking_read(); static int blocking_write(); static void blocking_read_sr_errchk(); static void blocking_write_sr_errchk(); static void blocking_read_errchk(); static void blocking_write_errchk(); static void set_nonblocking(); static void sr_fatal_error(); static void sr_fatal_error_1(); static void start_child(); static void start_nodes(); static int start_nodes_with_daemon(); static int start_nodes_with_script(); static void start_nodes_from_list(); static void start_node_on_host(); static void read_line_from_fd(); static void write_line_to_fd(); static void find_host_daemon(); static void read_startup_file(); static void start_host(); static void process_connection(); static void start_worker(); static void set_mask_for_all_workers(); #ifdef DEBUG static void dump_proc_table(); #endif /* DEBUG */ static void accept_connections(); static void send_proc_table(); static void receive_proc_table(); static void await_connections(); static void make_connections(); static void process_connection_from_worker(); static void process_mesg_from_node(); static bool_t process_mesg_from_host(); static int node_from_fd(); static void await_go(); static char *ident(); /* * _p_sr_get_argdesc() */ void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp) int argc; char **argv; argdesc_t **argdescp; int *n_argdescp; { static argdesc_t argdesc[] = { STRING_ARG("nodes", host_startup_list, "List of machine names on which to run PCN nodes"), STRING_ARG("s", parallel_startup, "system specific parallel startup file"), BOOL_ARG("start-nodes", &use_start_nodes, "try to use the host-control daemon to start nodes"), INTEGER_ARG("n", &_p_nodes, "number of nodes"), STRING_ARG("pi", parallel_info, "parallel info for node processes -- you should never use this"), STRING_ARG("hostconfig", hostconfig_file, "host configuration file"), BOOL_ARG("nostart", &dont_start_remotes, "don't actually start remote nodes"), BOOL_ARG("het", &start_heterogeneous, "allow a heterogeneous network"), #ifdef DEBUG BOOL_ARG("netdebug", &network_debugging, "turn on network debugging\n"), #endif }; exe_from_argv = argv[0]; parallel_startup[0] = '\0'; parallel_info[0] = '\0'; dont_start_remotes = FALSE; #ifdef DEBUG network_debugging = FALSE; #endif use_start_nodes = FALSE; start_heterogeneous = FALSE; hostconfig_file[0] = '\0'; host_startup_list[0] = '\0'; *argdescp = argdesc; *n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]); } /* * _p_alloc_msg_buffer() * * There is some trickery here to support the TCP_BUMP_HACK. If the * message size falls in the bump region (TCP_BUMP_MIN < size < TCP_BUMP_MAX) * then actually allocate TCP_BUMP_MAX bytes and fill in the allocated * size in msg_buf->alloc_size. Otherwise let msg_buf->alloc_size = -1. * * Return: A pointer to a message buffer with 'size' cells. */ cell_t *_p_alloc_msg_buffer(size) int_t size; /* In cells */ { int_t i; msg_buf_header_t *msg_buf; #ifdef TCP_BUMP_HACK bool_t resize = FALSE; #endif #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_alloc_msg_buffer: Allocating %lu cells\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ i = MsgSize2Bytes(size); #ifdef TCP_BUMP_HACK if (i > TCP_BUMP_MIN && i < TCP_BUMP_MAX) { /* Here's the bad area. Allocate TCP_BUMP_MAX bytes instead (and * actually send them! */ i = TCP_BUMP_MAX; resize = TRUE; } #endif if ((msg_buf = (msg_buf_header_t *) malloc((size_t) i)) == NULL) _p_malloc_error(); #ifdef TCP_BUMP_HACK msg_buf->alloc_size = resize ? i : -1; #endif msg_buf->size = size; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_alloc_msg_buffer: Allocated %lu cells\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ return (msg_buf->user_buf); } /* _p_alloc_msg_buffer() */ #define FreeMsgBuf(Buf) free((char *) Buf) /* * _p_abort_nodes() */ void _p_abort_nodes() { int_t i; for (i = 0; i < _p_nodes; i++) { if (i != _p_my_id) { close(proc_table[i].fd); } } } /* _p_abort_nodes() */ static bool_t recv_one_mesg(rcv_buf, blocking) msg_buf_header_t **rcv_buf; bool_t blocking; { fd_set read_fds; struct timeval tval, *tvalptr; int n, i, n_read; bool_t done, avail; static int next_proc = 0; proc_table_entry *ent; #ifdef DEBUG if ((network_debugging && blocking) || (network_debugging > 1)) printf("%lu: recv_one_mesg blocking=%lu\n", (unsigned long) ME, (unsigned long) blocking); #endif /* * First, check if there are any messages sitting in the table * already. If there are, pull one of them out and return it. The * stuff waiting on the streams can wait. */ avail = FALSE; for (i = 0; i < _p_nodes; i++) { ent = &(proc_table[i]); if (ent->buf != (char *) NULL && ent->len == 0) { avail = TRUE; break; } } done = avail; while (!done) { if (!blocking) { tval.tv_sec = 0; tval.tv_usec = 0; tvalptr= &tval; } else tvalptr = NULL; FD_ZERO(&read_fds); set_mask_for_all_workers(&read_fds); if (!_p_host && (proc_table[_p_host_id].fd != -1)) FD_SET(proc_table[_p_host_id].fd, &read_fds); #ifdef DEBUG if (network_debugging > 1) { char buf[1024], buf2[100]; buf[0] = 0; for (i = 0; i < howmany(FD_SETSIZE, NFDBITS); i++) { sprintf(buf2, "%0lx ", (unsigned long) read_fds.fds_bits[i]); strcat(buf, buf2); } printf("%lu: selecting mask=%s\n", ME, buf); } #endif n = select(NUM_FDS, &read_fds, (fd_set *) NULL, (fd_set *) NULL, tvalptr); #ifdef DEBUG if (network_debugging > 1) printf("%lu: select returns %d\n", ME, n); #endif if (n < 0) { char buf[300]; if (errno == EINTR) continue; sprintf(buf, "recv_one_mesg: select failed: %s", sys_errlist[errno]); _p_fatal_error(buf); } /* * Now we need to read something from each fd that has data * for reading. If something is available on a fd that isn't * already listed as having a read in progress, then there * must be a message header there. */ avail = FALSE; for (i = 0; i < _p_nodes; i++) { ent = &(proc_table[i]); if (ent->fd < 0) continue; if (FD_ISSET(ent->fd, &read_fds)) { #ifdef DEBUG if (network_debugging) printf("%lu: fd is set: node=%d fd=%d\n", _p_my_id, i, ent->fd); #endif if (ent->buf == (char *) NULL) { msg_buf_header_t hdr; int len; #ifdef DEBUG if (network_debugging) printf("%lu: reading header\n", _p_my_id); #endif blocking_read_errchk(ent->fd, (char *) &hdr, HEADER_SIZE * CELL_SIZE, "recv_one_mesg"); #ifdef DEBUG if (network_debugging) { fprintf(_p_stdout,"%lu: read header\n", ME); fprintf(_p_stdout, "%lu: Got header: fd=%d size=%lu type=%lu node=%lu\n", (unsigned long) _p_my_id, ent->fd, (unsigned long) hdr.size, (unsigned long) hdr.type, (unsigned long) hdr.node); } #endif #ifdef TCP_BUMP_HACK if (hdr.alloc_size != -1) len = hdr.alloc_size; else #endif len = MsgSize2Bytes(hdr.size); if ((ent->buf = (char *) malloc(len)) == (char *) NULL) _p_malloc_error(); ent->len = len - HEADER_SIZE * CELL_SIZE; memcpy((char *) ent->buf, (char *) &hdr, HEADER_SIZE * CELL_SIZE); ent->bufend = ent->buf + HEADER_SIZE * CELL_SIZE; } if (ent->buf == (char *) NULL) _p_fatal_error("recv_one_megs: NULL buffer"); while (ent->len > 0) { #ifdef DEBUG if (network_debugging) printf("%lu: reading %d bytes\n", ME, ent->len); #endif n_read = read(ent->fd, ent->bufend, ent->len); #ifdef DEBUG if (network_debugging) printf("%lu: read returns %d\n", ME, n_read); #endif if (n_read < 0) { if (errno == EINTR) continue; else if (errno == EWOULDBLOCK) { #ifdef DEBUG if (network_debugging) printf("%lu: read would block\n", ME); #endif break; } else FATAL_SYSTEM_ERROR("read failed: %s"); } if (n_read == 0) break; else { ent->len -= n_read; ent->bufend += n_read; } } if (ent->len == 0) { #ifdef DEBUG if (network_debugging) printf("%lu: Got complete message id=%d fd=%d\n", ME, i, ent->fd); #endif avail = TRUE; } } } if (!blocking) done = TRUE; else done = avail; } #ifdef DEBUG if (network_debugging > 1) printf("%lu: end of loop: avail=%lu\n", (unsigned long) ME, (unsigned long) avail); #endif if (!avail && blocking) _p_fatal_error("Blocking recv_one_mesg didn't find a message"); if (!avail) return FALSE; i = next_proc; while (1) { ent = &(proc_table[next_proc]); if (ent->buf != NULL && ent->len == 0) break; next_proc = (next_proc + 1) % _p_nodes; if (next_proc == i) _p_fatal_error("recv_one_mesg: avail was true, but didn't find a message"); } *rcv_buf = (msg_buf_header_t *) ent->buf; #ifdef DEBUG if (network_debugging) printf("%lu: Got message avail on entry %ld fd %d len %ld s=%lu t=%lu n=%lu\n", (unsigned long) ME, (long) (ent - proc_table), ent->fd, (long) (ent->bufend - ent->buf), (unsigned long) (**rcv_buf).size, (unsigned long) (**rcv_buf).type, (unsigned long) (**rcv_buf).node); #endif ent->buf = NULL; ent->len = 0; next_proc = (next_proc + 1) % _p_nodes; return TRUE; } bool_t _p_msg_receive(node, size, type, rcv_type) int *node; int *size; int *type; int rcv_type; { bool_t done, done_inner; msg_buf_header_t *rcv_buf; bool_t gotit; bool_t read_from_queue = FALSE; for (done = FALSE; !done; ) { if (mq_front != (msg_buf_header_t *) NULL && (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK)) { read_from_queue = TRUE; rcv_buf = mq_front; mq_front = mq_front->next; } else { switch (rcv_type) { case RCV_BLOCK: case RCV_NOBLOCK: gotit = recv_one_mesg(&rcv_buf, rcv_type == RCV_BLOCK); if (!gotit) { if (rcv_type == RCV_BLOCK) _p_fatal_error("_p_msg_receive: blocking read didn't return a message"); else return (FALSE); } break; case RCV_PARAMS: case RCV_GAUGE: for (done_inner = FALSE; !done_inner; ) { recv_one_mesg(&rcv_buf, TRUE); if ( (rcv_type==RCV_PARAMS && rcv_buf->type==MSG_PARAMS) || (rcv_type==RCV_GAUGE && rcv_buf->type==MSG_GAUGE) || rcv_buf->type == MSG_EXIT || rcv_buf->type == MSG_INITIATE_EXIT ) { done_inner = TRUE; } else { if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = rcv_buf; mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } } } break; case RCV_COLLECT: for (done_inner = FALSE; !done_inner; ) { recv_one_mesg(&rcv_buf, TRUE); switch (rcv_buf->type) { case MSG_READ: case MSG_CANCEL: case MSG_EXIT: case MSG_INITIATE_EXIT: done_inner = TRUE; break; case MSG_COLLECT: FreeMsgBuf(rcv_buf); break; default: /* * MSG_DEFINE, MSG_VALUE, MSG_CLOSE_STREAM, * MSG_GAUGE, or a stream message */ if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = rcv_buf; mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } break; } } break; } } #ifdef STREAMS if (rcv_buf->type == MSG_CLOSE_STREAM) { handle_close_stream(rcv_buf); } else if (rcv_buf->type < LAST_STREAM_MSG_TYPE) { handle_stream_receive(rcv_buf); } else /* Normal non-stream related message */ { #endif /* STREAMS */ *size = rcv_buf->size; *node = rcv_buf->node; *type = rcv_buf->type; /* * Now copy the message onto the heap. If there is * not enough room for it, then garbage collect. * Assume that there will always be enough room * for MSG_CANCEL message. */ if (*type != MSG_CANCEL) { TryGCWithSize(*size); } memcpy(_p_heap_ptr, rcv_buf->user_buf, *size * CELL_SIZE); FreeMsgBuf(rcv_buf); #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) *node, (unsigned long) *size, (unsigned long) *type); fflush(_p_stdout); } #endif /* DEBUG */ done = TRUE; #ifdef STREAMS } #endif /* STREAMS */ } return (TRUE); } void _p_msg_send(buf, node, size, type) cell_t *buf; int node; int size; int type; { bool_t gotit; msg_buf_header_t *send_buf, *rcv_buf; int_t allocated_size; int len; int n_left, n_sent; char *send_ptr; #ifdef DEBUG if (node == _p_my_id) _p_fatal_error("_p_msg_send(): Help! I can't send to myself!"); #endif /* DEBUG */ if (buf == (cell_t *) NULL) { if ((send_buf = (msg_buf_header_t *) malloc(MsgSize2Bytes(0))) == NULL) _p_malloc_error(); allocated_size = 0; size = 0; } else { send_buf = User2MsgBuf(buf); allocated_size = send_buf->size; } #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node, (unsigned long) size, (unsigned long) type); fflush(_p_stdout); } #endif /* DEBUG */ send_buf->size = size; send_buf->type = type; send_buf->node = _p_my_id; #ifdef TCP_BUMP_HACK len = send_buf->alloc_size == -1 ? MsgSize2Bytes(size) : send_buf->alloc_size; #else len = MsgSize2Bytes(size); #endif n_left = len; send_ptr = (char *) send_buf; while (n_left > 0) { while (1) { gotit = recv_one_mesg(&rcv_buf, FALSE); /* Nonblocking read */ if (!gotit) break; if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = rcv_buf; mq_back = rcv_buf; rcv_buf->next = (msg_buf_header_t *) NULL; } } /* * The HP machine has a problem with writing large blocks of data. * According to the write(2) manual page, there should be * no problem with this. But I was getting "Message too long" * errors. * * Supposedly, one can write PIPSIZ (=8192) bytes to a pipe. * Even that got me the Message too long errors. Hence we * write in 4K packets. */ #ifdef hpux len = n_left > 4096 ? 4096 : n_left; #else len = n_left; #endif n_sent = write(proc_table[node].fd, send_ptr, len); if (n_sent < 0) { if (errno == EWOULDBLOCK) { #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) Send would block\n", ident()); #endif continue; } else if (errno == EPIPE) { _p_fatal_error("_p_msg_send: Got eof on send"); } else { static char buf[100]; sprintf(buf, "_p_send_msg: Write failed: %s", SYSTEM_ERROR); _p_fatal_error(buf); } } #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) Sent %d/%d bytes to %d\n", ident(), n_sent, n_left, node); #endif n_left -= n_sent; send_ptr += n_sent; } FreeMsgBuf(send_buf); #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_send: Sent to %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node); fflush(_p_stdout); } #endif /* DEBUG */ } void _p_destroy_nodes() { destroying_nodes = TRUE; } void _p_sr_node_initialized() { worker_ctl_msg_t msg; int i; #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) Got sr_node_initialized, _p_multiprocessing=%lu\n", ident(), (unsigned long) _p_multiprocessing); #endif #if 1 if (!_p_multiprocessing) return; if (_p_host) { for (i = 1; i < _p_nodes; i++) { msg.type = WORKER_CTL_CHECKIN; msg.i1 = i; blocking_write_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg), "host checkin send"); blocking_read_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg), "_p_sr_node_initialized"); if (msg.type != WORKER_CTL_CHECKIN_ACK) { _p_fatal_error("Wanted WORKER_CTL_CHECKIN_ACK in _p_sr_node_initialized"); } if (msg.i1 != i) { _p_fatal_error("_p_sr_node_initialized: msg.i1 != i"); } #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) Node %d checked in\n", ident(), i); #endif } } else { blocking_read_errchk(proc_table[_p_host_id].fd, (char *) &msg,sizeof(msg), "_p_sr_node_initialized"); if (msg.type != WORKER_CTL_CHECKIN) { _p_fatal_error("host sent wrong message type on checkin"); } if (msg.i1 != _p_my_id) { _p_fatal_error("_p_sr_node_initialized: msg.i1 != my_id"); } msg.type = WORKER_CTL_CHECKIN_ACK; msg.i1 = _p_my_id; blocking_write_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg), "_p_sr_node_initialized"); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) Worker %lu done checking in\n", ident(), (unsigned long) _p_my_id); #endif } #endif } /* * int fork_nodes(int n) * * Fork off n worker nodes. */ static int fork_nodes(n) int n; { char host_name[128]; int pid = 1; if (gethostname(host_name, sizeof(host_name)) < 0) { sr_fatal_error("gethostname failed"); } for (; n > 0; n--) { #ifdef DEBUG if (ParDebug(9)) printf("Forking off a worker node\n"); #endif /* DEBUG */ if ((pid = fork()) < 0) /* error */ { sr_fatal_error("Failed fork()"); } else if (pid == 0) /* child */ { _p_host = FALSE; if (parallel_info[0] == '\0') sprintf(parallel_info, "%s,%d", host_name, my_port); close(listen_fd); break; } } return (pid); } /* fork_nodes() */ static void setup_host_port() { net_setup_anon_listener(50, &my_port, &listen_fd); } void _p_sr_init_node(argc, argv) int argc; char *argv[]; { int i; /* Initialize the proc table with all fd=-1 */ _p_stdout = stdout; for (i = 0; i < MAX_NODES; i++) { proc_table[i].buf = NULL; proc_table[i].len = 0; proc_table[i].fd = -1; } /* * _p_nodes will have a default value of -1. Therefore, all of the * tests below should allow a negative value for _p_nodes, which * indicates that _p_nodes was not set on the command line with -n. */ if(_p_nodes > MAX_NODES) { char buf[256]; sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES); sr_fatal_error(buf); } if (_p_nodes <= 1 && !use_start_nodes && host_startup_list[0] == '\0' && parallel_startup[0] == '\0' && parallel_info[0] == '\0') { /* * Do a uniprocessing run and return */ _p_my_id = _p_host_id = 0; _p_nodes = 1; _p_host = TRUE; /* Finish node initialization and run emulator */ _p_init_node(argc, argv); return; } signal(SIGPIPE, SIG_IGN); if (parallel_info[0] != '\0') { /* * This is a node. The -pi flag was used to tell this node * to start up _p_nodes nodes on this machine. */ _p_host = FALSE; fork_nodes(_p_nodes - 1); } else if (host_startup_list[0] != '\0') { /* * This is the host. The -nodes flag was used to specify * a list of the machines on which to start nodes. */ _p_host = TRUE; setup_host_port(); _p_nodes = 1; start_nodes_from_list(host_startup_list); } else if (parallel_startup[0] != '\0') { /* * This is the host. The -s flag was used to specify * a startup file. */ _p_host = TRUE; setup_host_port(); _p_nodes = 1; read_startup_file(parallel_startup); } else if (use_start_nodes) { /* * This is the host. The -start-nodes flag was used to * specify that it should start up the nodes * with the host-control stuff. Use the value of _p_nodes (which * is set with the -n flag) to control how many nodes will * be created. */ char host_name[256]; int num; if (gethostname(host_name, sizeof(host_name)) < 0) { sr_fatal_error("gethostname failed"); } _p_host = TRUE; setup_host_port(); num = _p_nodes - 1; _p_nodes = 1; /* Since start_nodes increments _p_nodes */ start_nodes(num, host_name); } else if (_p_nodes > 1) { /* * This is the host. Only the -n flag was used, so it * should fork off _p_nodes-1 workers. */ _p_host = TRUE; setup_host_port(); fork_nodes(_p_nodes - 1); } else { sr_fatal_error("Illegal argument combination"); } if (_p_host) { start_host(); } else { start_worker(); } #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "All right! just before init, proc table is...\n"); dump_proc_table(); } #endif /* OPTIMAL_TCP_BLOCKSIZE is the magic packet size for TCP */ _p_default_msg_buffer_size = OPTIMAL_TCP_BLOCKSIZE / CELL_SIZE - HEADER_SIZE; mq_front = mq_back = (msg_buf_header_t *) NULL; _p_init_node(argc, argv); } static void read_startup_file(file) char *file; { FILE *fp; char buf1[MAX_PATH_LENGTH], *b1; char host_name[256]; int i, j; /* Get the host's host name */ if (gethostname(host_name, sizeof(host_name)) == -1) sr_fatal_error("Failed to get host's machine name"); if ((fp = fopen(file, "r")) == (FILE *) NULL) sr_fatal_error("Unable to open parallel startup file"); /* Read in parallel startup file, one line at a time */ for (i = 1; fgets(buf1, MAX_PATH_LENGTH - 1, fp) != (char *) NULL; i++) { if (buf1[0] == '%' || buf1[0] == '\n' || buf1[0] == ' ' || buf1[0] == '\t' || buf1[0] == '#') { continue; } else if (strncmp(buf1, "fork", 4) == 0) { /* Fork off the specified number of nodes */ if (sscanf(buf1+4, "%d", &j) != 1 || j < 1) { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } _p_nodes += j; if (fork_nodes(j) == 0) /* child */ break; } else if (strncmp(buf1, "start-nodes", 11) == 0) /* Use the start-nodes script to connect to the * node-control daemons */ { int to_start; b1 = buf1 + 11; to_start = atoi(b1); if (to_start == 0) { sr_fatal_error("Invalid start-nodes"); } start_nodes(to_start, host_name); } else if (strncmp(buf1, "exec", 4) == 0) /* use system() */ { j = strlen(buf1) - 1; if (buf1[j] == '\n') buf1[j] = '\0'; /* Get the number of nodes to fork */ if (sscanf(buf1+4, "%d", &j) != 1 || j < 1) { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } /* Find the start of the command */ if ((b1 = strchr(buf1, ':')) == (char *) NULL) { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } b1++; start_child(j, b1,host_name); } else { sprintf(buf1, "Bad startup command, line %d", i); sr_fatal_error(buf1); } } fclose(fp); } static void start_nodes_from_list(hosts) char *hosts; { char *hptr, *s, *host; int done; char pwd[MAX_PATH_LENGTH]; char host_name[256]; /* Get the host's host name */ if (gethostname(host_name, sizeof(host_name)) == -1) sr_fatal_error("Failed to get host's machine name"); if (getwd(pwd) == 0) sr_fatal_error_1("getwd failed: %s\n", pwd); for (hptr = hosts, done = 0; !done; ) { host = hptr; s = strchr(hptr, ':'); if (s) { hptr = s + 1; *s = 0; } else done = 1; start_node_on_host(host, exe_from_argv, pwd, host_name); } } static void start_node_on_host(host, prog, pwd, myhost) char *host, *prog, *pwd, *myhost; { char cmd[1024]; #ifdef DEBUG if (network_debugging) { printf("Starting node on \"%s\" prog=\"%s\" pwd=\"%s\"\n", host, prog, pwd); } #endif sprintf(cmd, "rsh %s -n 'cd %s; %s -pcn -n 1 -pi %s,%d' &", host, pwd, prog, myhost, my_port); #ifdef DEBUG if (network_debugging) { printf("Command is <%s>\n", cmd); } #endif if (dont_start_remotes) { printf("Would have run command \"%s\"\n", cmd); } else if (system(cmd) != 0) { sr_fatal_error("Failed system()"); } _p_nodes++; } static void start_nodes(num, myhost) int num; char *myhost; { char argbuf[512]; char host_daemon_host[512]; int host_daemon_port; int nstarted; /* Construct the $ARGS$ argument */ sprintf(argbuf, " -pcn -n 1 -pi %s,%d", myhost, my_port); find_host_daemon(host_daemon_host, &host_daemon_port); if (host_daemon_host[0] == '\0') { nstarted = start_nodes_with_script(num, argbuf); } else { /* Ok. A host daemon is running. Use it to start the nodes */ nstarted = start_nodes_with_daemon(host_daemon_host, host_daemon_port, num, argbuf); #ifdef DEBUG if (network_debugging) printf("started %d nodes with daemon\n", nstarted); #endif } _p_nodes += nstarted; } static int start_nodes_with_script(num, args) int num; char *args; { char cmdbuf[1024]; sprintf(cmdbuf, "start-nodes %d %s", num, args); #ifdef DEBUG if (network_debugging) printf("cmdbuf: '%s'\n", cmdbuf); #endif if (dont_start_remotes) { printf("Would have run <%s>\n", cmdbuf); } else if (system(cmdbuf) != 0) { sr_fatal_error("failed system()"); } /* Here we don't know how many were started, so just assume it worked */ return num; } static int start_nodes_with_daemon(host, port, nnodes, args) char *host, *args; int port, nnodes; { int sock, n; char buf[1000]; char *bptr; char pwd[MAX_PATH_LENGTH+1]; char hcfile[MAX_PATH_LENGTH + 1]; if (getwd(pwd) == 0) sr_fatal_error_1("getwd failed: %s\n", pwd); #ifdef DEBUG if (network_debugging) printf("connecting to %s/%d\n", host, port); #endif sock = net_conn_to_listener(host, port); if (sock < 0) { sr_fatal_error_1("connect to host daemon failed: %s", SYSTEM_ERROR); return 0; } read_line_from_fd(sock, buf, sizeof(buf)); #ifdef DEBUG if (network_debugging) printf("got buf '%s'\n", buf); #endif sprintf(buf, "PCN startup arch=%s het=%lu listen_port=%d\n", PCN_ARCH, (unsigned long) start_heterogeneous, my_port); #ifdef DEBUG if (network_debugging) printf("sending buf '%s'\n", buf); #endif write_line_to_fd(sock, buf); /* * Read the ack */ read_line_from_fd(sock, buf, sizeof(buf)); #ifdef DEBUG if (network_debugging) printf("Got ack '%s'\n", buf); #endif if (hostconfig_file[0] != '\0') { if (hostconfig_file[0] != '/') sprintf(hcfile, "%s/%s", pwd, hostconfig_file); else strcpy(hcfile, hostconfig_file); if (access(hcfile, R_OK) != 0) { printf("hostconfig file %s not accessible\n", hcfile); hcfile[0] = '\0'; } else { sprintf(buf, "cmd: load-config %s\n", hcfile); write(sock, buf, strlen(buf)); while (1) { read_line_from_fd(sock, buf, sizeof(buf)); #ifdef DEBUG if (network_debugging) printf("Got line '%s'\n", buf); #endif if (buf[0] == '.' && buf[1] == '\0') break; } } } strcpy(buf, "cmd: enabled-hosts\n"); if (write(sock, buf, strlen(buf)) < 0) { perror("write"); exit(1); } read_line_from_fd(sock, buf, sizeof(buf)); #ifdef DEBUG if (network_debugging) printf("got buf '%s'\n", buf); #endif bptr = strchr(buf, ':'); bptr++; n = atoi(bptr); #ifdef DEBUG if (network_debugging) printf("got %d nodes enabled\n", n); #endif if (n == 0) { printf("No nodes enabled in host-control\n"); close(sock); return n; } /* Eat "." line */ read_line_from_fd(sock, buf, sizeof(buf)); /* If we've got a hostconfig file, then just execute a * startnodes command on the host. * Otherwise, do a startnodes_withdir using our current directory * as the directory and argv[0] as the pcn pathname */ if (hcfile[0] == '\0') { sprintf(buf, "cmd: startnodes_withdir %s %s %d %s\n", pwd, exe_from_argv, nnodes, args); } else { sprintf(buf, "cmd: startnodes %d %s\n", nnodes, args); } #ifdef DEBUG if (network_debugging) printf("starting nodes with '%s'\n", buf); #endif write(sock, buf, strlen(buf)); read_line_from_fd(sock, buf, sizeof(buf)); #ifdef DEBUG if (network_debugging) printf("got line '%s'\n", buf); #endif bptr = strchr(buf, ':'); if (bptr == NULL) { close(sock); sr_fatal_error("Startup with host-control failed\n"); } bptr++; while (isspace(*bptr)) bptr++; n = atoi(bptr); #ifdef DEBUG if (network_debugging) printf("Started %d nodes\n", n); #endif close(sock); return n; } static void write_line_to_fd(sock, str) int sock; char *str; { int len = strlen(str); if (write(sock, str, len) < 0) { perror("write"); } } static void read_line_from_fd(sock, buf, len) int sock, len; char *buf; { char *bptr; int n; bptr = buf; while (1) { n = read(sock, bptr, 1); if (n < 0) { perror("read"); exit(1); } if (*bptr == '\n') break; bptr++; if (bptr - buf >= len) break; } *bptr = 0; } static void find_host_daemon(host, port) char *host; int *port; { FILE *fp; char *home, *dir; char file[MAX_PATH_LENGTH+1]; char hostfile[MAX_PATH_LENGTH+1]; char buf[100]; DIR *dirp; #ifdef USE_DIRENT struct dirent *ent; #else #if defined(USE_DIRECT) struct direct *ent; #endif #endif hostfile[0] = '\0'; dir = getenv("PCN_CONTROL_DIR"); if (dir == NULL) { home = getenv("HOME"); if ((dir = (char *) malloc(MAX_PATH_LENGTH+1)) == (char *) NULL) _p_malloc_error(); sprintf(dir, "%s/.pcn_control/hosts", home); #ifdef DEBUG if (network_debugging) printf("got dir '%s'\n", dir); #endif } #if defined(USE_DIRECT) dirp = opendir(dir); if (dirp == NULL) { #ifdef DEBUG if (network_debugging) printf("couldn't open dir %s\n", dir); #endif *host = 0; return; } while ((ent = readdir(dirp)) != NULL) { if (ent->d_name[0] != '.') break; } closedir(dirp); if (ent) { strncpy(hostfile, ent->d_name, ent->d_namlen); hostfile[ent->d_namlen] = '\0'; } #else #if defined(USE_DIRENT) dirp = opendir(dir); if (dirp == NULL) { #ifdef DEBUG if (network_debugging) printf("couldn't open dir %s\n", dir); #endif *host = 0; return; } while ((ent = readdir(dirp)) != NULL) { if (ent->d_name[0] != '.') break; } closedir(dirp); if (ent) strcpy(hostfile, ent->d_name); #endif #endif if (hostfile[0] == '\0') { #ifdef DEBUG printf("didn't find a host file\n"); #endif *host = '\0'; return; } sprintf(file, "%s/%s", dir, hostfile); if ((fp = fopen(file, "r")) == NULL) { #ifdef DEBUG if (network_debugging) printf("couldn't open file '%s': %s\n", file, sys_errlist[errno]); #endif *host = 0; return; } if (fgets(buf, sizeof(buf), fp) == NULL) { #ifdef DEBUG if (network_debugging) printf("couldn't get a line from file\n"); #endif *host = 0; return; } fclose(fp); *port = atoi(buf); if (*port == 0) { #ifdef DEBUG if (network_debugging) printf("invalid line '%s'\n", buf); #endif *host = 0; return; } strcpy(host, ent->d_name); } static void start_child(num, rawcmd, host_name) int num; char *rawcmd; char *host_name; { char argbuf[512], cmdbuf[1024], buf1[512]; char *rawcmdptr, *cmdptr; bool_t found; /* Construct the $ARGS$ argument */ sprintf(argbuf, " -pcn -n %d -pi %s,%d", num, host_name, my_port); cmdptr = cmdbuf; /* Substitute $ARGS$ in the command with appropriate stuff */ for (rawcmdptr = rawcmd, cmdptr = cmdbuf, found = FALSE; *rawcmdptr != '\0'; ) { if (strncmp(rawcmdptr, "$ARGS$", 6) == 0) { *cmdptr = '\0'; strcat(cmdbuf, argbuf); strcat(cmdbuf, rawcmdptr + 6); found = TRUE; break; } else *cmdptr++ = *rawcmdptr++; } if (!found) { sprintf(buf1, "Bad startup command -- no $ARGS$"); sr_fatal_error(buf1); } strcat(cmdbuf, " &"); _p_nodes += num; /* Now system() it off */ #ifdef DEBUG if (ParDebug(9)) fprintf(_p_stdout, "Execing worker with: %s\n", cmdbuf); #endif if (dont_start_remotes) { printf("Would have run command \"%s\"\n", cmdbuf); } else if (system(cmdbuf) != 0) { sr_fatal_error("Failed system()"); } } /* * void sr_fatal_error(char *msg) * * Used by _p_sr_init_node() to deal with fatal errors during the * worker creation process. */ static void sr_fatal_error(msg) char *msg; { char *syserr = errno == 0 ? "" : sys_errlist[errno]; if (_p_my_id == -1) /* Node doesn't have an id yet */ { fprintf(stderr, "Fatal error: Node ???: Failed to create workers: %s (%s)\n", msg, syserr); } else { fprintf(stderr, "Fatal error: Node %lu: Failed to create workers: %s (%s)\n", (unsigned long) _p_my_id, msg, syserr); } exit(1); } /* sr_fatal_error() */ static void sr_fatal_error_1(msg, arg) char *msg; char *arg; { char buf[1024]; sprintf(buf, msg, arg); sr_fatal_error(buf); } /**************************************************************/ /* * Start up the host node. * * Create a port (my_port) and socket (listen_fd) to listen for * children's connections. * * Select on listen_fd and on any child connection. * * When each children connects, send message to each telling it * its node number and the ids of each of the other children. * */ static void start_host() { int i, rc; worker_ctl_msg_t msg; char buf[300]; #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host accepting connections\n", ident()); #endif accept_connections(); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host accepted connections\n", ident()); #endif #ifdef DEBUG if (network_debugging) dump_proc_table(); #endif _p_host_id =_p_my_id = 0; for (i = 1; i < _p_nodes; i++) { #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host sending proc table to %d\n", ident(), i); #endif send_proc_table(i); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host sent proc table to %d\n", ident(), i); #endif } /* Tell everyone to connect. * * We even tell node _p_nodes-1 to connect even though he doesn't have * anyone to connect to since it makes the code simpler. It provides * another place to check with the node to ensure it is still there, * though. */ for (i = 1; i < _p_nodes; i++) { #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host telling %d to start connecting\n", ident(), i); #endif msg.type = WORKER_CTL_START_CONNECTING; blocking_write_sr_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg), "start_host"); if ((rc = blocking_read(proc_table[i].fd, (char *) &msg, sizeof(msg))) < 0) { sprintf(buf, "start_host: read failed from node %d: %s", i, SYSTEM_ERROR); sr_fatal_error(buf); } else if (rc == 0) { sprintf(buf, "start_host: read failed from node %d: got eof", i); sr_fatal_error(buf); } #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host found connections from %d done\n", ident(), i); #endif if (msg.type != WORKER_CTL_CONNECTIONS_MADE) { sprintf(buf, "start_host: wanted type %d, got %d\n", WORKER_CTL_CONNECTIONS_MADE, msg.type); sr_fatal_error(buf); } } /* Let's go! */ for (i = 1; i < _p_nodes; i++) { #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host tells worker %d go\n", ident(), i); #endif msg.type = WORKER_CTL_GO; blocking_write_sr_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg), "start_host"); if ((rc = blocking_read(proc_table[i].fd, (char *) &msg, sizeof(msg))) < 0) { sprintf(buf, "start_host: read failed for go ack on node %d: %s", i, SYSTEM_ERROR); sr_fatal_error(buf); } else if (rc == 0) { sprintf(buf, "start_host: read failed for go ack on node %d: eof", i); sr_fatal_error(buf); } if (msg.type != WORKER_CTL_GO_ACK) { sprintf(buf, "start_host: didn't get ack, got %ld instead", (long) msg.type); sr_fatal_error(buf); } #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host got GO_ACK from %d\n", ident(), i); #endif } } static void accept_connections() { fd_set read_fds; int n, fd, n_connected; worker_ctl_msg_t msg; /* Loop until all of the workers have connected back with the host */ for (n_connected = 1; n_connected < _p_nodes; /* n_connected is incremented in process_connection() */ ) { FD_ZERO(&read_fds); set_mask_for_all_workers(&read_fds); FD_SET(listen_fd, &read_fds); n = select(NUM_FDS, &read_fds, (fd_set *) NULL, (fd_set *) NULL, (struct timeval *) NULL); if (n == 0) continue; if (n < 0) { char buf[300]; if (errno == EINTR) continue; sprintf(buf, "Fatal error in select: %s\n", sys_errlist[errno]); sr_fatal_error(buf); } for (fd = 0; fd < NUM_FDS && n > 0; fd++) { if (FD_ISSET(fd, &read_fds)) { n--; if (fd == listen_fd) process_connection(&n_connected); else { blocking_read_sr_errchk(fd, (char *) &msg, sizeof(msg), "accept_connections"); sr_fatal_error("accept_connections: Didn't expect message from other worker"); } } } } } static void send_proc_table(node) int node; { int i, fd; worker_ctl_msg_t msg; fd = proc_table[node].fd; /* Don't need to send entries for node or host */ for (i = 1; i < _p_nodes; i++) { if (i == node) continue; msg.type = WORKER_CTL_PROCTBL_ENTRY; msg.i1 = i; msg.i2 = proc_table[i].port; strcpy(msg.buf, proc_table[i].host); blocking_write_sr_errchk(fd, (char *) &msg, sizeof(msg), "send_proc_table"); } msg.type = WORKER_CTL_PROCTBL_END; blocking_write_sr_errchk(fd, (char *) &msg, sizeof(msg), "send_proc_table"); } /* * Process a new connection on the host's port, * incrementing *n_conn with success */ static void process_connection(n_conn) int *n_conn; { int worker_fd; worker_ctl_msg_t msg; worker_fd = net_accept(listen_fd); if (worker_fd < 0) { sr_fatal_error("process_connection: accept failed"); } set_nonblocking(worker_fd); proc_table[*n_conn].fd = worker_fd; msg.type = WORKER_CTL_INIT; msg.i1 = *n_conn; msg.i2 = _p_nodes; #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) host sends CTL_INIT to %d\n", ident(), *n_conn); #endif blocking_write_sr_errchk(worker_fd, (char *) &msg, sizeof(msg), "process_connection"); blocking_read_sr_errchk(worker_fd, (char *) &msg, sizeof(msg), "process_connection"); if (msg.type == WORKER_CTL_EXEC_FAILED) { char buf[300]; errno = 0; sprintf(buf, "host-control says worker exec failed: %s", msg.buf); sr_fatal_error(buf); } else if (msg.type != WORKER_CTL_PORT) { char buf[300]; sprintf(buf, "incorrect type in receive: %ld, wanted %d", (long) msg.type, WORKER_CTL_PORT); sr_fatal_error(buf); } #ifdef DEBUG if (network_debugging) printf("Host got connection for node %d host %s port %d\n", *n_conn, msg.buf, msg.i1); #endif proc_table[*n_conn].port = msg.i1; strcpy(proc_table[*n_conn].host, msg.buf); (*n_conn)++; } /* * The worker connects back to the host's port, determined by parsing * the argument passed in parallel_info * */ static void start_worker() { char *s, host[256]; int port; int host_fd; worker_ctl_msg_t msg; if (parallel_info[0] == '\0') sr_fatal_error("Worker doesn't know his host"); if ((s = strchr(parallel_info, ',')) == (char *) NULL) { sr_fatal_error("Failed to parse -pi argument"); } *s = 0; strcpy(host, parallel_info); port = atoi(s + 1); host_fd = net_conn_to_listener(host, port); if (host_fd < 0) { sr_fatal_error("Error connecting to host"); } set_nonblocking(host_fd); blocking_read_sr_errchk(host_fd, (char *) &msg, sizeof(msg), "start_worker"); if (msg.type != WORKER_CTL_INIT) { sr_fatal_error("Incorrect message type from host"); } _p_my_id = msg.i1; _p_nodes = msg.i2; _p_host_id = 0; proc_table[_p_host_id].fd = host_fd; proc_table[_p_host_id].port = port; strcpy(proc_table[_p_host_id].host, host); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) worker got CTL_INIT _p_nodes=%lu \n", ident(), (unsigned long) _p_nodes); #endif net_setup_anon_listener(3, &my_port, &listen_fd); if (gethostname(msg.buf, sizeof(msg.buf)) == -1) { sr_fatal_error("gethostname failed"); } msg.type = WORKER_CTL_PORT; msg.i1 = my_port; blocking_write_sr_errchk(host_fd, (char *) &msg, sizeof(msg), "start_worker"); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) worker send port %d\n", ident(), my_port); #endif proc_table[_p_my_id].fd = listen_fd; proc_table[_p_my_id].port = my_port; strcpy(proc_table[_p_my_id].host, msg.buf); receive_proc_table(); #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "Worker %lu proc table:\n", (unsigned long) _p_my_id); dump_proc_table(); } #endif #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) worker awaiting connections \n", ident()); #endif await_connections(); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) worker making connections \n", ident()); #endif make_connections(); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) worker awaiting go \n", ident()); #endif await_go(); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) worker got go \n", ident()); #endif } /* Wait for a go message from the host */ static void await_go() { worker_ctl_msg_t msg; blocking_read_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg), "await_go"); if (msg.type != WORKER_CTL_GO) { sr_fatal_error("await_go: didn't get go"); } msg.type = WORKER_CTL_GO_ACK; blocking_write_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg), "await_go"); } static void receive_proc_table() { bool_t done; worker_ctl_msg_t msg; int host_fd; /* Just hang in a receive from the host. No one else should be sending to us yet, so they can just wait. */ host_fd = proc_table[_p_host_id].fd; for (done = FALSE; !done; ) { blocking_read_sr_errchk(host_fd, (char *) &msg, sizeof(msg), "receive_proc_table"); switch (msg.type) { case WORKER_CTL_PROCTBL_ENTRY: proc_table[msg.i1].port = msg.i2; strcpy(proc_table[msg.i1].host, msg.buf); break; case WORKER_CTL_PROCTBL_END: done = TRUE; break; default: sr_fatal_error("Invalid type in receive_proc_table"); break; } } } /* Wait for * - other nodes to connect to me * - host to tell me to make connections * (in which case I just return and my caller calls make_connections) */ static void await_connections() { fd_set read_fds; bool_t done; int n, fd, node; for (done = FALSE; !done; ) { FD_ZERO(&read_fds); set_mask_for_all_workers(&read_fds); FD_SET(listen_fd, &read_fds); FD_SET(proc_table[_p_host_id].fd, &read_fds); n = select(NUM_FDS, &read_fds, (fd_set *) NULL, (fd_set *) NULL, (struct timeval *) NULL); if (n == 0) continue; if (n < 0) { char buf[300]; if (errno == EINTR) continue; sprintf(buf, "Fatal error in select: %s\n", sys_errlist[errno]); sr_fatal_error(buf); } for (fd = 0; fd < NUM_FDS && n > 0; fd++) { if (FD_ISSET(fd, &read_fds)) { n--; if (fd == listen_fd) process_connection_from_worker(); else { node = node_from_fd(fd); if (node == -1) { sr_fatal_error_1("await_connections: can't find node for fd %d\n", fd); } if (node == _p_host_id) done = process_mesg_from_host(); else process_mesg_from_node(node); } } } } } static void process_connection_from_worker() { int worker_fd; worker_ctl_msg_t msg; worker_fd = net_accept(listen_fd); if (worker_fd < 0) sr_fatal_error("process_connection_from_worker: accept failed"); set_nonblocking(worker_fd); msg.type = WORKER_CTL_CONN_TEST; msg.i1 = _p_my_id; blocking_write_sr_errchk(worker_fd, (char *) &msg, sizeof(msg), "process_connection_from_worker"); blocking_read_sr_errchk(worker_fd, (char *) &msg, sizeof(msg), "process_connection_from_worker"); if (msg.type != WORKER_CTL_CONN_ACK) sr_fatal_error("process_connection_from_worker: wrong response from worker"); if (msg.i2 != _p_my_id) { char buf[300]; sprintf(buf, "process_connection_from_worker: node mismatch: me=%lu i1=%d i2=%d\n", (unsigned long) _p_my_id, msg.i1, msg.i2); sr_fatal_error(buf); } proc_table[msg.i1].fd = worker_fd; } /* Called from await_connections when host sends message. * * Return TRUE if await_connections should return. */ static bool_t process_mesg_from_host() { bool_t rc = FALSE; worker_ctl_msg_t msg; char buf[300]; blocking_read_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg), "process_mesg_from_host"); switch(msg.type) { case WORKER_CTL_START_CONNECTING: rc = TRUE; break; default: sprintf(buf, "process_mesg_from_host: don't know how to handle message type %ld", (long) msg.type); sr_fatal_error(buf); break; } return rc; } static void process_mesg_from_node(node) int node; { char buf[300]; int fd; worker_ctl_msg_t msg; fd = proc_table[node].fd; blocking_read_sr_errchk(fd, (char *) &msg, sizeof(msg), "process_mesg_from_node"); switch (msg.type) { default: sprintf(buf, "process_mesg_from_node: don't know how to handle message type %ld", (long) msg.type); sr_fatal_error(buf); break; } } static void make_connections() { int node; int conn_fd; worker_ctl_msg_t msg; char buf[300]; #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) making connections\n", ident()); #endif for (node = _p_my_id + 1; node < _p_nodes; node++) { conn_fd = net_conn_to_listener(proc_table[node].host, proc_table[node].port); if (conn_fd < 0) sr_fatal_error("make_connections: connect failed"); set_nonblocking(conn_fd); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) connected to fd %d\n", ident(), conn_fd); #endif blocking_read_errchk(conn_fd, (char *) &msg, sizeof(msg), "make_connections"); if (msg.type != WORKER_CTL_CONN_TEST) { sprintf(buf, "make_connections: received mesg type %d, wanted %d", msg.type, WORKER_CTL_CONN_TEST); sr_fatal_error(buf); } if (node != msg.i1) { sprintf(buf, "make_connections: node mismatch: node=%d msg.i1=%d", node, msg.i1); sr_fatal_error(buf); } msg.type = WORKER_CTL_CONN_ACK; msg.i1 = _p_my_id; msg.i2 = node; #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) got node %d\n", ident(), node); #endif blocking_write_errchk(conn_fd, (char *) &msg, sizeof(msg), "make_connections"); proc_table[node].fd = conn_fd; } #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) done making connections, sending done\n", ident()); #endif msg.type = WORKER_CTL_CONNECTIONS_MADE; blocking_write_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg), "make_connections"); #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) done making connections, sent\n", ident()); #endif } static void set_mask_for_all_workers(mask) fd_set *mask; { proc_table_entry *ent; int i, fd; if (destroying_nodes) return; for (i = 1; i < _p_nodes; i++) { ent = &(proc_table[i]); fd = ent->fd; if (fd != -1 && i != _p_my_id && !(ent->buf != NULL && ent->len == 0)) { FD_SET(fd, mask); } } } #ifdef DEBUG static void dump_proc_table() { int_t i; proc_table_entry *ent; printf("%lu: Proc table: _p_nodes=%lu\n", (unsigned long) ME, (unsigned long) _p_nodes); for (i = 0; i < _p_nodes; i++) { ent = &proc_table[i]; printf("%lu: entry %lu: port=%d fd=%d host=%s buf=%lx bufend=%lx len=%d\n", (unsigned long) ME, (unsigned long) i, ent->port, ent->fd, ent->host, (unsigned long) ent->buf, (unsigned long) ent->bufend, ent->len); } } #endif /* DEBUG */ static int node_from_fd(fd) int fd; { int i; for (i = 0;i < _p_nodes; i++) if (proc_table[i].fd == fd) return i; #ifdef DEBUG /* fprintf(_p_stdout, "(%s) node_from_fd(%d) failed\n", ident(), fd); dump_proc_table(); */ #endif return -1; } static void set_nonblocking(fd) int fd; { #ifndef hpux int flags; if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { perror("fcntl F_GETFL 1"); exit(1); } flags |= FNDELAY; if (fcntl(fd, F_SETFL, flags) < 0) { perror("fcntl F_SETFL"); exit(1); } #else int one = 1; if (ioctl(fd, FIONBIO, &one) < 0) { perror("ioctl FIONBIO"); exit(1); } #endif #ifdef F_SETFD if (fcntl(fd, F_SETFD, 1) < 0) { perror("fcntl F_SETFD"); exit(1); } #endif } struct FourInts { int i1, i2, i3, i4, i5; }; static int blocking_read(fd, buf, size) int fd; char *buf; int size; { int n, n_left; char *msgptr; for (n_left = size, msgptr = buf; n_left > 0; ) { n = read(fd, msgptr, n_left); if (n == 0) { #ifdef DEBUG if (network_debugging) printf("(%s) read from %d got eof\n", ident(), node_from_fd(fd)); #endif return 0; } if (n < 0) { #ifdef DEBUGFOO if (network_debugging) fprintf(_p_stdout, "(%s) read from fd %d node %d returned error: %s\n", ident(), fd, node_from_fd(fd), SYSTEM_ERROR); #endif if (errno == EWOULDBLOCK) continue; else if (errno == EPIPE) return 0; else return n; } #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) read %d/%d bytes from node %d\n", ident(), n, n_left, node_from_fd(fd)); #endif n_left -= n; msgptr += n; } #ifdef DEBUG { struct FourInts *fi = (struct FourInts *) buf; if (network_debugging) fprintf(_p_stdout, "(%s) read got header %d %d %d %d %d\n", ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5); } #endif return size; } static int blocking_write(fd, buf, size) int fd; char *buf; int size; { int n, n_left; char *msgptr; #ifdef DEBUG { struct FourInts *fi = (struct FourInts *) buf; if (network_debugging) fprintf(_p_stdout, "(%s) write putting header %d %d %d %d %d\n", ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5); } #endif for (n_left = size, msgptr = buf; n_left > 0; ) { n = write(fd, msgptr, n_left); /* if (n == 0) { printf("write got eof\n"); return 0; } */ if (n < 0) { if (errno == EWOULDBLOCK) continue; else if (errno == EPIPE) return 0; else return n; } #ifdef DEBUG if (network_debugging) fprintf(_p_stdout, "(%s) wrote %d/%d bytes to node %d\n", ident(), n, n_left, node_from_fd(fd)); #endif n_left -= n; msgptr += n; } #ifdef DEBUG { struct FourInts *fi = (struct FourInts *) buf; if (network_debugging) fprintf(_p_stdout, "(%s) write got header %d %d %d %d %d\n", ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5); } #endif return size; } static void blocking_write_errchk(fd, buf, size, func) int fd; char *buf; int size; char *func; { int rc = blocking_write(fd, buf, size); static char msgbuf[1024]; if (rc < 0) { sprintf(msgbuf, "%s: write failed: %s", func, SYSTEM_ERROR); _p_fatal_error(msgbuf); } else if (rc == 0) { sprintf(msgbuf, "%s: eof on write\n", func); _p_fatal_error(msgbuf); } } static void blocking_read_errchk(fd, buf, size, func) int fd; char *buf; int size; char *func; { int rc = blocking_read(fd, buf, size); static char msgbuf[1024]; if (rc < 0) { sprintf(msgbuf, "%s: read failed: %s", func, SYSTEM_ERROR); _p_fatal_error(msgbuf); } else if (rc == 0) { sprintf(msgbuf, "%s: eof on read -- some other node probably aborted\n", func); _p_fatal_error(msgbuf); } } static void blocking_write_sr_errchk(fd, buf, size, func) int fd; char *buf; int size; char *func; { int rc = blocking_write(fd, buf, size); static char msgbuf[1024]; if (rc < 0) { sprintf(msgbuf, "%s: write failed: %s", func, SYSTEM_ERROR); sr_fatal_error(msgbuf); } else if (rc == 0) { sprintf(msgbuf, "%s: eof on write\n", func); sr_fatal_error(msgbuf); } } static void blocking_read_sr_errchk(fd, buf, size, func) int fd; char *buf; int size; char *func; { int rc = blocking_read(fd, buf, size); static char msgbuf[1024]; if (rc < 0) { sprintf(msgbuf, "%s: read failed: %s", func, SYSTEM_ERROR); sr_fatal_error(msgbuf); } else if (rc == 0) { sprintf(msgbuf, "%s: eof on read\n", func); sr_fatal_error(msgbuf); } } static void net_setup_anon_listener(backlog, port, skt) int backlog; int *port; int *skt; { int rc,sinlen; struct sockaddr_in sin; int optval = TRUE; *skt = socket(AF_INET, SOCK_STREAM, 0); error_check(*skt,"net_setup_anon_listener socket"); rc = setsockopt(*skt,IPPROTO_TCP,TCP_NODELAY,(char *)&optval,sizeof(optval)); if (rc < 0) { sr_fatal_error_1("setsockopt failed: %s", SYSTEM_ERROR); } sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(0); sinlen = sizeof(sin); error_check(bind(*skt,(struct sockaddr *) &sin,sizeof(sin)), "net_setup_anon_listener bind"); error_check(listen(*skt, backlog), "net_setup_anon_listener listen"); getsockname(*skt, (struct sockaddr *) &sin, &sinlen); *port = ntohs(sin.sin_port); } /* Accept a connection on socket skt and return fd of new connection. */ static int net_accept(skt) int skt; { struct sockaddr_in from; int fromlen; int skt2; int gotit; fromlen = sizeof(from); gotit = 0; while (!gotit) { skt2 = accept(skt, (struct sockaddr *) &from, &fromlen); if (skt2 == -1) { if (errno == EINTR) continue; else error_check(skt2, "net_accept accept"); } else gotit = 1; } return(skt2); } static int net_conn_to_listener(hostname, port) char *hostname; int port; { int rc,s; struct sockaddr_in listener; struct hostent *hp; int optval = TRUE; hp = gethostbyname(hostname); if (hp == NULL) { sr_fatal_error_1("connect_to_listener: gethostbyname %s failed", hostname); } ZeroOutMemory((char *) &listener, sizeof(listener)); memcpy((char *) &listener.sin_addr, (char *) hp->h_addr, hp->h_length); listener.sin_family = hp->h_addrtype; listener.sin_port = htons(port); s = socket(AF_INET, SOCK_STREAM, 0); error_check(s, "net_conn_to_listener socket"); error_check(setsockopt(s,IPPROTO_TCP,TCP_NODELAY, (char *) &optval, sizeof(optval)), "net_conn_to_listener setsockopt"); rc = connect(s,(struct sockaddr *) &listener, sizeof(listener)); error_check(rc, "net_conn_to_listener connect"); return(s); } static void error_check(val, str) int val; char *str; { char buf[200]; if (val < 0) { sprintf(buf, "%s :%d: %s\n", str, val, sys_errlist[errno]); sr_fatal_error(buf); } } #ifdef DEBUG static char *ident() { static char buf[100]; static char host[128]; static int gotHost = 0; static int pid; if (!gotHost) { char *s; gethostname(host, sizeof(host)); if ((s = strchr(host, '.')) !=NULL) *s = 0; pid = getpid(); gotHost = 1; } sprintf(buf, "%lu-%s-%d", (unsigned long) _p_my_id, host, pid); return buf; } #endif /* DEBUG */ #ifdef STREAMS /* * void init_stream_table() * * Initialize the stream array by setting all the pointers to NULL. */ static void init_stream_table() { int_t i; for (i = 0; i < max_streams; i++) { stream_table[i].stream_array = (cell_t *) NULL; stream_table[i].sindex = i + 1; stream_table[i].enabled = FALSE; stream_table[i].mq_front = (msg_buf_header_t *) NULL; stream_table[i].mq_back = (msg_buf_header_t *) NULL; } first_free_stream_id = 0; stream_table[max_streams-1].sindex = -1; } /* * bool_t _p_alloc_stream(cell_t *stream_array, int_t sindex, int_t *id) * * Grab the first free table entry, and fill it in with the passed * stream pointer to the header of the stream array (stream_array) and the * index into the stream array (sindex). * * Return: TRUE if a stream was succesfully allocated, FALSE otherwise * *id = the stream id (and integer) */ bool_t _p_alloc_stream(stream_array, sindex, id) cell_t *stream_array; int_t sindex; int_t *id; { *id = first_free_stream_id; if (first_free_stream_id == -1) { return (FALSE); } else { stream_table[*id].stream_array = stream_array; stream_table[*id].sindex = sindex; first_free_stream_id = stream_table[first_free_stream_id].sindex; return (TRUE); } } /* _p_alloc_stream() */ /* * void _p_free_stream(int_t id) * * Free the stream with the passed stream id. */ void _p_free_stream(id) int_t id; { if (id < 0 || id >= max_streams) _p_fatal_error("_p_free_stream(): Invalid stream id"); #ifdef DEBUG if (stream_table[id].stream_array == (cell_t *) NULL) { fprintf(_p_stdout, "(%lu,%lu) Warning: _p_free_stream(): Freeing a NULL stream: %ld\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (long) id); } #endif /* DEBUG */ stream_table[id].stream_array = (cell_t *) NULL; stream_table[id].sindex = first_free_stream_id; first_free_stream_id = id; } /* _p_free_stream() */ /* * void _p_update_stream(int_t id, cell_t *stream_array) * * Update the stream_table entry for id so that it points to the * passed stream array (stream_array). This procedure is used by the * garbage collector to update the stream array location after it * moves the stream array on the heap during garbage collection. */ void _p_update_stream(id, stream_array) int_t id; cell_t *stream_array; { if (id < 0 || id >= max_streams) _p_fatal_error("_p_update_stream(): Invalid stream id"); #ifdef DEBUG if (stream_table[id].stream_array == (cell_t *) NULL) { fprintf(_p_stdout, "(%lu,%lu) Warning: _p_update_stream(): Updating a NULL stream: %ld\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (long) id); } #endif /* DEBUG */ stream_table[id].stream_array = stream_array; } /* _p_update_stream() */ #ifdef DEBUG #define ValidateStreamData(ID) \ { \ if((ID) < 0 || (ID) >= max_streams) \ _p_fatal_error("_p_lookup_stream*(): Invalid stream id"); \ if(stream_table[(ID)].stream_array == (cell_t *) NULL) \ _p_fatal_error("_p_lookup_stream*(): Could not find stream"); \ if(((data_header_t *) (stream_table[(ID)].stream_array))->tag != STREAM_TAG) \ _p_fatal_error("_p_lookup_stream*(): Expected a STREAM_TAG"); \ } #else /* DEBUG */ #define ValidateStreamData() #endif /* DEBUG */ /* * stream_t *_p_lookup_stream(int_t id); * * Lookup the stream id in the stream_table. Return the location on the * heap of the stream_t data structure that correspondes to the appropriate * index within the stream array. * * NOTE: The return value will not point to a tagged structure. It * will point to the actual stream structure. */ stream_t *_p_lookup_stream(id) int_t id; { ValidateStreamData(id); return ((stream_t *) (stream_table[id].stream_array + stream_table[id].sindex) ); } /* _p_lookup_stream() */ /* * cell_t *_p_lookup_stream_header(int_t id); * * Lookup the stream id in the stream_table. Return the location on the * heap of the header for the stream array that has this stream in it. */ cell_t *_p_lookup_stream_header(id) int_t id; { ValidateStreamData(id); return (stream_table[id].stream_array); } /* _p_lookup_stream() */ /* * int_t _p_lookup_stream_index(int_t id); * * Lookup the stream id in the stream_table. Return the index of this * stream in its stream array. */ int_t _p_lookup_stream_index(id) int_t id; { ValidateStreamData(id); return (stream_table[id].sindex); } /* _p_lookup_stream() */ /* * bool_t _p_first_stream(int_t *id) * * This procedure is used in conjunction with _p_next_stream() to * iterate through all of the streams. They can be used as follows: * * bool_t cont_iter; * int_t id; * for (cont_iter = _p_first_stream(&id); * cont_iter; * cont_iter = _p_next_stream(&id) * ) * { * Do_Whatever(); * } * * Return: TRUE if there are any more streams, FALSE otherwise * *id = the first stream id */ bool_t _p_first_stream(id) int_t *id; { *id = -1; return (_p_next_stream(id)); } /* _p_first_stream() */ /* * bool_t _p_next_stream(int_t *id) * * This procedure is used in conjunction with _p_first_stream() to * iterate through all of the streams. It returns (in *id) the next * stream after *id. (i.e. It modifies the value of *id.) * * Return: TRUE if there are any more streams, FALSE otherwise * *id = the next stream id after the one passed in *id */ bool_t _p_next_stream(id) int_t *id; { if (*id < -1 || *id >= max_streams) _p_fatal_error("_p_next_stream(): Invalid stream id"); for ((*id)++; ((*id < max_streams) && (stream_table[*id].stream_array == (cell_t *) NULL)); (*id)++ ) ; if (*id >= max_streams) { *id = -1; return (FALSE); } else { return (TRUE); } } /* _p_next_stream() */ /* * void _p_enable_stream_recv(int_t id) * * Enable remote stream receives on the passed stream 'id'. * * Optional: If there is a pending remote stream message for this * stream, then this procedure can go ahead and deliver it. */ void _p_enable_stream_recv(id) int_t id; { ValidateStreamData(id); if (stream_table[id].mq_front != (msg_buf_header_t *) NULL) { /* A message is available on this stream, so deliver it */ msg_buf_header_t *msg; msg = stream_table[id].mq_front; stream_table[id].mq_front = msg->next; deliver_stream_msg(msg); FreeMsgBuf(msg); } else if (stream_table[id].closed) { /* * There are no messages left to be delivered, and the stream * has been closed by the sender. So close the receiving side. */ _p_close_stream(id); } else { stream_table[id].enabled = TRUE; } } /* _p_enable_stream_recv() */ /* * void _p_disable_stream_recv(int_t id) * * Disable remote stream receives on the passed stream 'id'. * * Optional: If there is a pending remote stream message for this * stream, then this procedure can go ahead and deliver it. */ void _p_disable_stream_recv(id) int_t id; { ValidateStreamData(id); stream_table[id].enabled = FALSE; } /* _p_disable_stream_recv() */ /* * void _p_send_close_stream(int_t node, int_t id) * * Close down stream <node,id>. This procedure is called from the * sending side of the stream. * * Care must be taken to ensure that the stream is not closed on the * receiving side until all messages on this stream have been delivered. * Thus, this stream_close message needs to remain ordered relative to * other sends on this stream. */ void _p_send_close_stream(node, id) int_t node; int_t id; { cell_t *msg_buf; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_close_stream(): to node %lu, id %ld\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node, (long) id); fflush(_p_stdout); } #endif /* DEBUG */ msg_buf = _p_alloc_msg_buffer(1); *((int_t *) msg_buf) = id; _p_msg_send(msg_buf, node, 1, MSG_CLOSE_STREAM); #ifdef GAUGE _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ } /* _p_send_close_stream() */ /* * void _p_stream_send(int_t id, int_t node, * cell_t *array, u_int_t offset, u_int_t size) * * Send a stream message it stream id, 'id', on node 'node'. The * message should come from the passed 'array', starting 'offset' * elements into the array, 'size' elements in size. */ void _p_stream_send(id, array, offset, size) int_t id; cell_t *array; u_int_t offset; u_int_t size; { /* * Allocate a message buffer of the appropriate size, set up the * the header, copy in the data, and deliver it. */ } /* * void handle_close_stream(msg_buf_header_t *msg) * * When _p_msg_receive() receives a MSG_CLOSE_STREAM message it * calls this procedure to handle it. This procedure should figure * out which stream id this message is for, mark the closed flag in * the stream_table, and deliver the close stream (call _p_close_stream()) * if there are no messages pending delivery. */ static void handle_close_stream(msg) msg_buf_header_t *msg; { int id; id = *((int_t *) (msg->user_buf)); ValidateStreamData(id); #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) handle_close_stream(): From node %lu, id %ld\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) msg->node, (long) id); } #endif /* DEBUG */ stream_table[id].closed = TRUE; if (stream_table[id].mq_front == (msg_buf_header_t *) NULL) _p_close_stream(id); #ifdef GAUGE _p_gauge_stats.rsmallmsgs++; #endif /* GAUGE */ } /* handle_close_stream() */ /* * void handle_stream_receive(msg_buf_header_t *msg) * * When _p_msg_receive() receives a stream message it calls this procedure * to handle it. This procedure should figure out which stream id * this message is for, and do the following: * - If receives are enabled on this id, then deliver the message. * - If recevies are not enabled on this id, then queue the message up. */ static void handle_stream_receive(msg) msg_buf_header_t *msg; { } /* handle_stream_receive() */ /* * void deliver_stream_msg(msg_buf_header_t *msg) * * When a message is ready for delivery (there is a pending receive * and received data) then this procedure is called. It should find * the stream data structure on the heap and copy the contents of * the message buffer as appropriate. */ static void deliver_stream_msg(msg) msg_buf_header_t *msg; { } /* deliver_stream_receive() */ #endif /* STREAMS */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.