This is sr_shmem.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * sr_shmem.c - Send/Receive routines that use shared memory, for: * Sequent Symmetry * * #include "sr_doc.h" * See sr_doc.h for detailed documentation on Send/Receive modules. */ #include "pcn.h" #ifdef symmetry /* * We cannot include parallel/parallel.h because gcc does not * understand the asm (inline assembler) statements. */ /* #include <parallel/parallel.h> */ #include <signal.h> /* * These are the parts from parallel/parallel.h that we need... */ typedef unsigned char slock_t; #define L_UNLOCKED 0 #define L_LOCKED 1 #define L_FAILED 0 #define L_SUCCESS 1 extern char *shmalloc(); #define SH_MALLOC(N) shmalloc(N) #define SH_FREE(B) shfree(B) typedef slock_t LOCK_T; #define LOCK_INIT(L) s_init_lock(L) #define UNLOCK(L) s_unlock(L) #ifdef SPIN_LOCKS #define LOCK(L) s_lock(L) #else /* SPIN_LOCKS */ #define LOCK(L) sleep_lock(L) #define DEFAULT_LOCK_SLEEP_TIME 10000 #include <sys/time.h> static int lock_sleep_time; static void sleep_lock(lp) LOCK_T *lp; { struct timeval timeout; while (s_clock(lp) == L_FAILED) { timeout.tv_sec = 0; timeout.tv_usec = lock_sleep_time; if (select(0,0,0,0,&timeout) < 0) _p_fatal_error("select failed\n"); } } /* sleep_lock() */ #endif /* SPIN_LOCKS */ #endif /* symmetry */ typedef struct msg_buf_header_struct { struct msg_buf_header_struct * next; int_t size; int_t type; int_t node; cell_t user_buf[1]; } msg_buf_header_t; #define HEADER_SIZE 4 #define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE) #define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \ - HEADER_SIZE)) typedef struct { LOCK_T mon_lock; LOCK_T mon_wait; } monitor_t; typedef struct { msg_buf_header_t *front, *back; int_t num; monitor_t *m; } sr_monitor_t; /* This what the symmetry used to use shared sr_monitor_t *msg_queues[MAX_NODES]; */ static sr_monitor_t **msg_queues; static sr_monitor_t *my_q; /* Monitor primitives */ #define MENTER(M) LOCK(&(M->mon_lock)); #define MEXIT(M) UNLOCK(&(M->mon_lock)); #define MWAIT(M) \ { \ UNLOCK(&(M->mon_lock)); \ LOCK(&(M->mon_wait)); \ LOCK(&(M->mon_lock)); \ } #define MSIGNAL(M) UNLOCK(&(M->mon_wait)); #define DEFAULT_MSG_BUFFER_SIZE 1020 /* * void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc) */ void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp) int argc; char **argv; argdesc_t **argdescp; int *n_argdescp; { static argdesc_t argdesc[] = { INTEGER_ARG("n", &_p_nodes, "number of nodes"), #ifndef SPIN_LOCKS INTEGER_ARG("lock_sleep", &lock_sleep_time, "sleep time (in usec) between lock attempts"), #endif }; #ifndef SPIN_LOCKS lock_sleep_time = DEFAULT_LOCK_SLEEP_TIME; #endif *argdescp = argdesc; *n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]); } /* _p_sr_get_argdesc() */ /* * void sr_fatal_error(char *msg) */ static void sr_fatal_error(msg) char *msg; { if (_p_my_id == -1) /* Node doesn't have an id yet */ { fprintf(stderr, "Fatal error: Node ???: Failed to create workers: %s\n", msg); } else { fprintf(stderr, "Fatal error: Node %d: Failed to create workers: %s\n", _p_my_id, msg); } kill(0, SIGKILL); /* Kill the entire process group */ exit(1); } /* sr_fatal_error() */ /* * void _p_sr_init_node() */ void _p_sr_init_node(argc, argv) int argc; char *argv[]; { sr_monitor_t *q; int_t i; int pid; _p_my_id =_p_host_id = 0; _p_host = TRUE; if (_p_nodes > 1) { /* * Allocated shared memory for the array of monitor pointers. */ msg_queues = (sr_monitor_t **) SH_MALLOC(sizeof(sr_monitor_t *) * _p_nodes); if (msg_queues == (sr_monitor_t **) NULL) { sr_fatal_error("Failed to malloc shared memory for monitor table"); } /* * Allocated shared memory for and initialize each node's monitor */ for (i = 0; i < _p_nodes; i++) { if ((q = (sr_monitor_t *) SH_MALLOC(sizeof(sr_monitor_t))) == (sr_monitor_t *) NULL) { sr_fatal_error("Failed to initialize monitor"); } msg_queues[i] = q; q->front = q->back = (msg_buf_header_t *) NULL; q->num = 0; if ((q->m = (monitor_t *) SH_MALLOC(sizeof(monitor_t))) == (monitor_t *) NULL) { sr_fatal_error("Failed to initialize monitor"); } LOCK_INIT(&(q->m->mon_lock)); LOCK_INIT(&(q->m->mon_wait)); LOCK(&(q->m->mon_wait)); } for (i = 1; i <= _p_nodes; i++) { #ifdef DEBUG if (ParDebug(9)) printf("Forking off a worker node\n"); #endif DEBUG if ((pid = fork()) < 0) /* error */ { sr_fatal_error("Failed fork()"); } else if (pid == 0) /* child */ { _p_host = FALSE; _p_my_id = i; break; } } my_q = msg_queues[_p_my_id]; /* * Do any other send/receive initialization stuff here... */ _p_default_msg_buffer_size = DEFAULT_MSG_BUFFER_SIZE; } /* Finish node initialization and run emulator */ _p_init_node(argc, argv); } /* _p_sr_init_node() */ /* * void _p_sr_node_initialized() */ void _p_sr_node_initialized() { } /* _p_sr_node_initialized() */ /* * void _p_destroy_nodes() */ void _p_destroy_nodes() { } /* * void _p_abort_nodes() */ void _p_abort_nodes() { kill(0, SIGINT); /* Kill the entire process group */ /* kill(0, SIGKILL); */ } /* _p_abort_nodes() */ /* * cell_t *_p_alloc_msg_buffer(int size) */ cell_t *_p_alloc_msg_buffer(size) int size; /* In cells */ { int i; msg_buf_header_t *msg_buf; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_alloc_msg_buffer: Allocating %lu cells\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ i = MsgSize2Bytes(size); if ((msg_buf = (msg_buf_header_t *) SH_MALLOC(i)) == (msg_buf_header_t *) NULL) { _p_fatal_error("Failed to allocate a message buffer"); } #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_alloc_msg_buffer: Allocated %lu cells\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) size); fflush(_p_stdout); } #endif /* DEBUG */ return (msg_buf->user_buf); } /* _p_alloc_msg_buffer() */ /* * void _p_msg_send(cell_t *buf, int node, int size, int type) */ void _p_msg_send(buf, node, size, type) cell_t *buf; int node; int size; /* in cells */ int type; { sr_monitor_t *q = msg_queues[node]; msg_buf_header_t *msg_buf; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node, (unsigned long) size, (unsigned long) type); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef DEBUG if (node == _p_my_id) _p_fatal_error("Internal error: Cannot send message to myself!"); #endif /* DEBUG */ /* * If buf == NULL, then we have en empty message. So just allocate * enough space for the message headers. */ if (buf == (cell_t *) NULL) { if ((msg_buf = (msg_buf_header_t *) SH_MALLOC(MsgSize2Bytes(0))) == (msg_buf_header_t *) NULL) { _p_fatal_error("Failed to allocate a message buffer"); } size = 0; } else { msg_buf = User2MsgBuf(buf); } msg_buf->size = size; msg_buf->type = type; msg_buf->node = _p_my_id; MENTER(q->m); if (q->num == 0) { q->front = q->back = msg_buf; msg_buf->next = (msg_buf_header_t *) NULL; } else { q->back->next = msg_buf; msg_buf->next = (msg_buf_header_t *) NULL; q->back = msg_buf; } q->num++; MSIGNAL(q->m); MEXIT(q->m); #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_send: Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* _p_msg_send() */ /* * bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type) */ bool_t _p_msg_receive(node, size, type, rcv_type) int *node; int *size; int *type; int rcv_type; { msg_buf_header_t *msg_buf, *last_buf, *tmp_msg_buf; bool_t done = FALSE; int_t last_num; #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_receive: Receiving message, type %lu, num=%lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) rcv_type, (unsigned long) my_q->num); fflush(_p_stdout); } #endif /* DEBUG */ MENTER(my_q->m); switch (rcv_type) { case RCV_NOBLOCK: if (my_q->num == 0) { #ifdef DEBUG if (ParDebug(9)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_receive: Non-blocking return\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ MEXIT(my_q->m); return (FALSE); } msg_buf = my_q->front; my_q->front = msg_buf->next; break; case RCV_BLOCK: while(my_q->num == 0) MWAIT(my_q->m); msg_buf = my_q->front; my_q->front = msg_buf->next; break; case RCV_PARAMS: case RCV_GAUGE: last_num = 0; last_buf = (msg_buf_header_t *) NULL; while (!done) { while(my_q->num == last_num) MWAIT(my_q->m); last_num = my_q->num; if (last_buf == (msg_buf_header_t *) NULL) msg_buf = my_q->front; else msg_buf = last_buf->next; while (msg_buf != (msg_buf_header_t *) NULL) { if ( (rcv_type == RCV_PARAMS && msg_buf->type == MSG_PARAMS) || (rcv_type == RCV_GAUGE && msg_buf->type == MSG_GAUGE) || msg_buf->type == MSG_INITIATE_EXIT || msg_buf->type == MSG_EXIT ) { if (last_buf == (msg_buf_header_t *) NULL) my_q->front = msg_buf->next; else last_buf->next = msg_buf->next; if (my_q->back == msg_buf) /* reset queue back */ my_q->back = last_buf; done = TRUE; break; } else { last_buf = msg_buf; msg_buf = msg_buf->next; } } } break; case RCV_COLLECT: last_num = 0; last_buf = (msg_buf_header_t *) NULL; while (!done) { while(my_q->num == last_num) MWAIT(my_q->m); last_num = my_q->num; if (last_buf == (msg_buf_header_t *) NULL) msg_buf = my_q->front; else msg_buf = last_buf->next; while (msg_buf != (msg_buf_header_t *) NULL) { if ( msg_buf->type == MSG_READ || msg_buf->type == MSG_CANCEL || msg_buf->type == MSG_INITIATE_EXIT || msg_buf->type == MSG_EXIT) { /* Return these types of messages */ if (last_buf == (msg_buf_header_t *) NULL) my_q->front = msg_buf->next; else last_buf->next = msg_buf->next; if (my_q->back == msg_buf) /* reset queue back */ my_q->back = last_buf; done = TRUE; break; } else if (msg_buf->type == MSG_COLLECT) { /* Remove these messages from the queue */ if (last_buf == (msg_buf_header_t *) NULL) my_q->front = msg_buf->next; else last_buf->next = msg_buf->next; if (my_q->back == msg_buf) /* reset queue back */ my_q->back = last_buf; tmp_msg_buf = msg_buf->next; SH_FREE(msg_buf); msg_buf = tmp_msg_buf; } else { /* * MSG_DEFINE || MSG_VALUE || MSG_GAUGE -- so leave * it on the queue an keep searching */ last_buf = msg_buf; msg_buf = msg_buf->next; } } } break; } my_q->num--; MEXIT(my_q->m); *size = msg_buf->size; *node = msg_buf->node; *type = msg_buf->type; /* * Now copy the message onto the heap. If there is not enough room * for it, then garbage collect. * Assume that there will always be enough room for MSG_CANCEL message. */ if (*type != MSG_CANCEL) { TryGCWithSize(*size); } #ifdef PCN_ALIGN_DOUBLES if (DoubleAligned(_p_heap_ptr)) _p_heap_ptr++; #endif /* PCN_ALIGN_DOUBLES */ memcpy(_p_heap_ptr, (char *) msg_buf->user_buf, *size * CELL_SIZE); SH_FREE(msg_buf); #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) *node, (unsigned long) *size, (unsigned long) *type); fflush(_p_stdout); } #endif /* DEBUG */ return (TRUE); } /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.