This is sr_ce.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * sr_ce.c - Send/Receive routines built on the Cosmic Environment * * #include "sr_doc.h" * See sr_doc.h for detailed documentation on Send/Receive modules. */ #include "pcn.h" #include <cube/cubedef.h> #define PCN_PID 42 #ifndef USEHOST #define USEHOST 1 #endif /* USEHOST */ #define pcnnode_to_cenode(N) (N == _p_host_id ? HOST : N) #define DEFAULT_MSG_BUFFER_SIZE 1020 typedef struct msg_buf_header_struct { struct msg_buf_header_struct * next; int_t size; int_t type; int_t node; cell_t user_buf[1]; } msg_buf_header_t; #define HEADER_SIZE 4 #define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE) #define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \ - HEADER_SIZE)) static msg_buf_header_t *mq_front, *mq_back; static bool_t run_uniprocessing; /* * void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc) */ void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp) int argc; char **argv; argdesc_t **argdescp; int *n_argdescp; { #ifdef PCN_HOST static argdesc_t argdesc[] = { BOOL_ARG("uni", &run_uniprocessing, "run as a uniprocessing system"), }; run_uniprocessing = FALSE; *argdescp = argdesc; *n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]); #endif /* PCN_HOST */ } /* _p_sr_get_argdesc() */ /* * void sr_fatal_error(char *msg) */ static void sr_fatal_error(msg) char *msg; { if (_p_my_id == -1) /* Node doesn't have an id yet */ { fprintf(stderr, "Fatal error: Node ???: Failed to create workers: %s\n", msg); } else { fprintf(stderr, "Fatal error: Node %lu: Failed to create workers: %s\n", (unsigned long) _p_my_id, msg); } print("Fatal error: Failed to create workers: %s", msg); ckill(-1,-1,""); exit(1); } /* sr_fatal_error() */ /* * void _p_sr_init_node() * * The host creates the nodes and does initialization. * The nodes do initialization. */ void _p_sr_init_node() { #ifdef PCN_HOST if (run_uniprocessing) { _p_nodes = 1; _p_host_id = 0; _p_usehost = TRUE; _p_my_id = 0; _p_host = TRUE; /* Finish node initialization and run emulator */ _p_init_node(); return; } cosmic_init(HOST, PCN_PID); if (mynode() == HOST) { if (spawn(_p_exe_file, -1, PCN_PID, "") != 0) { sr_fatal_error("Failed spawn()"); } _p_host_id = nnodes(); _p_nodes = _p_host_id + 1; _p_my_id = _p_host_id; _p_host = TRUE; if(_p_nodes > MAX_NODES) { char buf[256]; sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES); sr_fatal_error(buf); } } else { sr_fatal_error("This code is compiled as a CE host"); } #endif /* PCN_HOST */ #ifdef PCN_NODE _p_host_id = nnodes(); _p_nodes = _p_host_id + 1; _p_my_id = mynode(); _p_host = FALSE; #endif PCN_NODE #if USEHOST == 1 _p_usehost = TRUE; #else _p_usehost = FALSE; #endif /* USEHOST */ /* * Do any other send/receive initialization stuff here... */ _p_default_msg_buffer_size = DEFAULT_MSG_BUFFER_SIZE; mq_front = mq_back = (msg_buf_header_t *) NULL; /* Finish node initialization and run emulator */ _p_init_node(); } /* _p_sr_init_node() */ /* * void _p_sr_node_initialized() */ void _p_sr_node_initialized() { #ifdef DONT_INCLUDE int *msg; int i; /* Test by sending something around the ring */ if (_p_host) { print("Host: Sending test"); i = (_p_my_id+1) % _p_nodes; if ((msg = (int *) xmalloc(sizeof(int))) == (int *) NULL) { _p_fatal_error("Failed xmalloc()"); } *msg = 42; print("Host: Sending test to node %lu", i); xsend(msg, pcnnode_to_cenode(i), PCN_PID); print("Host: Sent test to node %lu", i); msg = (int *) xrecvb(); print("Host: Received test (%lu)", *msg); xfree(msg); } else { msg = (int *) xrecvb(); print("(%lu) Received test (%lu)", _p_my_id, *msg); i = (_p_my_id+1) % _p_nodes; print("(%lu) Sending test to node %lu", _p_my_id, i); xsend(msg, pcnnode_to_cenode(i), PCN_PID); print("(%lu) Sent test to node %lu", _p_my_id, i); } if (_p_host) ckill(-1,-1,""); print("Exiting"); exit(0); #endif DONT_INCLUDE } /* _p_sr_node_initialized() */ /* * void _p_destroy_nodes() */ void _p_destroy_nodes() { if (_p_host) { ckill(-1,-1,""); exit(0); } } /* * void _p_abort_nodes() */ void _p_abort_nodes() { int i; msg_buf_header_t *msg_buf; if (_p_host) { ckill(-1,-1,""); } else { if ((msg_buf = (msg_buf_header_t *) xmalloc(4)) == (msg_buf_header_t *) NULL) { ckill(-1,-1,""); } msg_buf->size = 0; msg_buf->type = MSG_ABORT; msg_buf->node = _p_my_id; xsend(msg_buf, HOST, PCN_PID); } } /* _p_abort_nodes() */ /* * msg_buf_header_t *_p_alloc_msg_buffer(int size) */ cell_t *_p_alloc_msg_buffer(size) int size; /* In cells */ { int i; msg_buf_header_t *msg_buf; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_alloc_msg_buffer: Allocating %lu cells\n", (unsigned long) _p_my_id, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ i = MsgSize2Bytes(size); if ((msg_buf = (msg_buf_header_t *) xmalloc(i)) == (msg_buf_header_t *) NULL) { _p_fatal_error("Failed to allocate a message buffer"); } #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_alloc_msg_buffer: Allocated %lu cells\n", (unsigned long) _p_my_id, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ return (msg_buf->user_buf); } /* _p_alloc_msg_buffer() */ /* * void _p_msg_send(cell_t *buf, int node, int size, int type) */ void _p_msg_send(buf, node, size, type) cell_t *buf; int node; int size; /* in cells */ int type; { msg_buf_header_t *send_buf; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) node, (unsigned long) size, (unsigned long) type); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef DEBUG if (node == _p_my_id) _p_fatal_error("Internal error: Cannot send message to myself!"); #endif /* DEBUG */ /* * If buf == NULL, then we have an empty message. So just allocate * enough space for the message headers. */ if (buf == (cell_t *) NULL) { if ((send_buf = (msg_buf_header_t *) xmalloc(MsgSize2Bytes(0))) == (msg_buf_header_t *) NULL) { _p_fatal_error("Failed to allocate a message buffer"); } size = 0; } else { send_buf = User2MsgBuf(buf); } send_buf->size = size; send_buf->type = type; send_buf->node = _p_my_id; xsend(send_buf, pcnnode_to_cenode(node), PCN_PID); #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu) _p_msg_send: Sent\n", (unsigned long) _p_my_id); fflush(_p_stdout); } #endif /* DEBUG */ } /* _p_send_message() */ /* * bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type) */ bool_t _p_msg_receive(node, size, type, rcv_type) int *node; int *size; int *type; int rcv_type; { msg_buf_header_t *msg_buf; bool_t done = FALSE; #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu) _p_msg_receive: Receiving message, type %lu\n", (unsigned long) _p_my_id, (unsigned long) rcv_type); fflush(_p_stdout); } #endif /* DEBUG */ /* Take message off queue if appropriate */ if (mq_front != (msg_buf_header_t *) NULL && (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK)) { msg_buf = mq_front; mq_front = mq_front->next; } /* else, get next message from system */ else { switch (rcv_type) { case RCV_NOBLOCK: if ((msg_buf = (msg_buf_header_t *) xrecv()) == (msg_buf_header_t *) NULL) { #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu) _p_msg_receive: Non-blocking return\n", (unsigned long) _p_my_id); fflush(_p_stdout); } #endif /* DEBUG */ return (FALSE); } break; case RCV_BLOCK: if ((msg_buf = (msg_buf_header_t *) xrecvb()) == (msg_buf_header_t *) NULL) { _p_fatal_error("Failed on blocking receive"); } break; case RCV_PARAMS: for (done = FALSE; !done; ) { if ((msg_buf = (msg_buf_header_t *) xrecvb()) == (msg_buf_header_t *) NULL) { _p_fatal_error("Failed on blocking receive"); } if ( msg_buf->type == MSG_PARAMS || msg_buf->type == MSG_EXIT ) { done = TRUE; } else { if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = msg_buf; msg_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = msg_buf; mq_back = msg_buf; msg_buf->next = (msg_buf_header_t *) NULL; } } } break; case RCV_COLLECT: while (!done) { if ((msg_buf = (msg_buf_header_t *) xrecvb()) == (msg_buf_header_t *) NULL) { _p_fatal_error("Failed on blocking receive"); } switch(msg_buf->type) { case MSG_ABORT: case MSG_EXIT: case MSG_READ: case MSG_CANCEL: done = TRUE; break; case MSG_COLLECT: xfree(msg_buf); break; case MSG_DEFINE: case MSG_VALUE: if (mq_front == (msg_buf_header_t *) NULL) { mq_front = mq_back = msg_buf; msg_buf->next = (msg_buf_header_t *) NULL; } else { mq_back->next = msg_buf; mq_back = msg_buf; msg_buf->next = (msg_buf_header_t *) NULL; } break; default: _p_fatal_error("_p_msg_receive(): Illegal message type"); } } break; } if (msg_buf->type == MSG_ABORT) { _p_fatal_error("Received an abort message"); } } *size = msg_buf->size; *node = msg_buf->node; *type = msg_buf->type; /* * Now copy the message onto the heap. If there is not enough room * for it, then garbage collect. * Assume that there will always be enough room for MSG_CANCEL message. */ if (*type != MSG_CANCEL) { TryGCWithSize(*size); } #ifdef PCN_ALIGN_DOUBLES if (DoubleAligned(_p_heap_ptr)) _p_heap_ptr++; #endif /* PCN_ALIGN_DOUBLES */ memcpy(_p_heap_ptr, (char *) msg_buf->user_buf, *size * CELL_SIZE); xfree(msg_buf); #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) *node, (unsigned long) *size, (unsigned long) *type); fflush(_p_stdout); } #endif /* DEBUG */ return (TRUE); } /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.