This is sr_nx.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * sr_nx.c - Send/Receive routines built on the Intel nX node operating * system. This supports the Intel iPSC/860 and * the Intel Delta. * * #include "sr_doc.h" * See sr_doc.h for detailed documentation on Send/Receive modules. */ #include "pcn.h" /* These are included from gauge.h * #ifdef delta #include <mesh.h> #else #include <cube.h> #endif */ #define DEFAULT_MSG_BUFFER_SIZE 1020 #define PCN_PID 0 typedef struct msg_buf_header_struct { struct msg_buf_header_struct * next; int_t size; int_t type; int_t node; cell_t user_buf[1]; } msg_buf_header_t; #define HEADER_SIZE 4 #define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE) static msg_buf_header_t *mq_front, *mq_back; /* * void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc) */ void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp) int argc; char **argv; argdesc_t **argdescp; int *n_argdescp; { *argdescp = (argdesc_t *) 0; *n_argdescp = 0; } /* _p_sr_get_argdesc() */ /* * void sr_fatal_error(char *msg) */ static void sr_fatal_error(msg) char *msg; { if (_p_my_id == -1) /* Node doesn't have an id yet */ { fprintf(stderr, "Fatal error: Node ???: Failed to create workers: %s\n", msg); } else { fprintf(stderr, "Fatal error: Node %lu: Failed to create workers: %s\n", (unsigned long) _p_my_id, msg); } killcube(-1,-1); exit(1); } /* sr_fatal_error() */ /* * void _p_sr_init_node() */ void _p_sr_init_node(argc, argv) int argc; char *argv[]; { _p_nodes = numnodes(); _p_host_id = 0; _p_my_id = mynode(); if (_p_my_id == _p_host_id) _p_host = TRUE; else _p_host = FALSE; if(_p_nodes > MAX_NODES) { if (_p_host) { char buf[256]; sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES); sr_fatal_error(buf); } else { exit(1); } } /* * Do any other send/receive initialization stuff here... */ _p_default_msg_buffer_size = DEFAULT_MSG_BUFFER_SIZE; mq_front = mq_back = (msg_buf_header_t *) NULL; /* Finish node initialization and run emulator */ _p_init_node(argc, argv); } /* _p_sr_init_node() */ /* * void _p_sr_node_initialized() */ void _p_sr_node_initialized() { #ifdef DONT_INCLUDE int msg = 42; int i; /* Test by sending something around the ring */ if (_p_multiprocessing) { if (_p_host) { fprintf(_p_stdout, "Host: Sending test\n"); i = (_p_my_id + 1) % _p_nodes; fprintf(_p_stdout, "Host: Sending test to node %lu\n", (unsigned long) i); csend(0, &msg, 4, i, PCN_PID); fprintf(_p_stdout, "Host: Sent test to node %lu\n", (unsigned long) i); crecv(0, &msg, 4); fprintf(_p_stdout, "Host: Received test (%lu)\n", (unsigned long) msg); } else { crecv(0, &msg, 4); fprintf(_p_stdout, "(%lu) Received test (%lu)\n", (unsigned long) _p_my_id, (unsigned long) msg); i = (_p_my_id + 1) % _p_nodes; fprintf(_p_stdout, "(%lu) Sending test to node %lu\n", (unsigned long) _p_my_id, (unsigned long) i); csend(0, &msg, 4, i, PCN_PID); fprintf(_p_stdout, "(%lu) Sent test to node %lu\n", (unsigned long) _p_my_id, (unsigned long) i); } if (_p_host) killcube(-1,-1); fprintf(_p_stdout, "Exiting\n"); exit(0); } #endif DONT_INCLUDE } /* _p_sr_node_initialized() */ /* * void _p_destroy_nodes() */ void _p_destroy_nodes() { } /* * void _p_abort_nodes() */ void _p_abort_nodes() { if (_p_host) { killcube(-1,-1); } else { csend(MSG_ABORT, 0, 0, _p_host_id, PCN_PID); } } /* _p_abort_nodes() */ /* * cell_t *_p_alloc_msg_buffer(int size) */ cell_t *_p_alloc_msg_buffer(size) int size; /* In cells */ { int i; cell_t *msg_buf; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_alloc_msg_buffer: Allocating %lu cells\n", (unsigned long) _p_my_id, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ i = size * CELL_SIZE; if ((msg_buf = (cell_t *) malloc(i)) == (cell_t *) NULL) _p_fatal_error("Failed to allocate a message buffer"); #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_alloc_msg_buffer: Allocated %lu cells\n", (unsigned long) _p_my_id, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ return (msg_buf); } /* _p_alloc_msg_buffer() */ /* * void _p_msg_send(cell_t *buf, int node, int size, int type) */ void _p_msg_send(buf, node, size, type) cell_t *buf; int node; int size; /* in cells */ int type; { #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) node, (unsigned long) size, (unsigned long) type); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef DEBUG if (node == _p_my_id) _p_fatal_error("Internal error: Cannot send message to myself!"); #endif /* DEBUG */ /* * If buf == NULL, then we have en empty message. So just allocate * enough space for the message headers. */ if (buf == (cell_t *) NULL) size = 0; csend(type, buf, size * CELL_SIZE, node, PCN_PID); if (buf != (cell_t *) NULL) free(buf); #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_msg_send: Sent\n", (unsigned long) _p_my_id); fflush(_p_stdout); } #endif /* DEBUG */ } /* _p_send_message() */ /* * bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type) */ bool_t _p_msg_receive(node, size, type, rcv_type) int *node; int *size; int *type; int rcv_type; { msg_buf_header_t *buf; bool_t done = FALSE; long in_size; #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu) _p_msg_receive: Receiving message, type %lu\n", (unsigned long) _p_my_id, (unsigned long) rcv_type); fflush(_p_stdout); } #endif /* DEBUG */ /* Take message off queue if appropriate */ if (mq_front != (msg_buf_header_t *) NULL && (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK)) { buf = mq_front; mq_front = mq_front->next; *size = buf->size; *node = buf->node; *type = buf->type; /* * Now copy the message onto the heap. If there is not enough room * for it, then garbage collect. * Assume that there will always be enough room for MSG_CANCEL message. */ if (*type != MSG_CANCEL) { TryGCWithSize(*size); } #ifdef PCN_ALIGN_DOUBLES if (DoubleAligned(_p_heap_ptr)) _p_heap_ptr++; #endif /* PCN_ALIGN_DOUBLES */ memcpy(_p_heap_ptr, buf->user_buf, *size * CELL_SIZE); free(buf); } /* else, get next message from port */ else { switch (rcv_type) { case RCV_NOBLOCK: if (!iprobe(-1)) { #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu) _p_msg_receive: Non-blocking return\n", (unsigned long) _p_my_id); fflush(_p_stdout); } #endif /* DEBUG */ return (FALSE); } goto RCV_THE_REST; break; case RCV_BLOCK: cprobe(-1); RCV_THE_REST: *node = infonode(); in_size = infocount(); *size = in_size / CELL_SIZE; *type = infotype(); if (*type != MSG_CANCEL) { TryGCWithSize(*size); } #ifdef PCN_ALIGN_DOUBLES if (DoubleAligned(_p_heap_ptr)) _p_heap_ptr++; #endif /* PCN_ALIGN_DOUBLES */ crecv(-1, _p_heap_ptr, in_size); break; case RCV_PARAMS: case RCV_GAUGE: while (!done) { cprobe(-1); *node = infonode(); in_size = infocount(); *size = in_size / CELL_SIZE; *type = infotype(); if ( (rcv_type == RCV_PARAMS && *type == MSG_PARAMS) || (rcv_type == RCV_GAUGE && *type == MSG_GAUGE) || *type == MSG_INITIATE_EXIT || *type == MSG_EXIT || *type == MSG_ABORT ) { crecv(-1, _p_heap_ptr, in_size); done = TRUE; } else { /* enqueue the message */ if((buf=(msg_buf_header_t *) malloc(MsgSize2Bytes(*size))) == (msg_buf_header_t *) NULL) _p_malloc_error(); buf->node = *node; buf->size = *size; buf->type = *type; crecv(-1, buf->user_buf, in_size); if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = buf; buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = buf; mq_back = buf; buf->next = (msg_buf_header_t *) NULL; } } } break; case RCV_COLLECT: while (!done) { cprobe(-1); *node = infonode(); in_size = infocount(); *size = in_size / CELL_SIZE; *type = infotype(); switch(*type) { case MSG_READ: case MSG_CANCEL: case MSG_INITIATE_EXIT: case MSG_EXIT: case MSG_ABORT: crecv(-1, _p_heap_ptr, in_size); done = TRUE; break; case MSG_COLLECT: crecv(-1, _p_heap_ptr, in_size); break; default: /* MSG_DEFINE || MSG_VALUE || MSG_GAUGE -- so enqueue it */ if((buf=(msg_buf_header_t *) malloc(MsgSize2Bytes(*size))) == (msg_buf_header_t *) NULL) _p_malloc_error(); buf->node = *node; buf->size = *size; buf->type = *type; crecv(-1, buf->user_buf, in_size); if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = buf; buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = buf; mq_back = buf; buf->next = (msg_buf_header_t *) NULL; } break; } } break; } if (*type == MSG_ABORT) _p_fatal_error("Received an abort message"); } #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) *node, (unsigned long) *size, (unsigned long) *type); fflush(_p_stdout); } #endif /* DEBUG */ return (TRUE); } /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.