This is sr_shmem.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* sr_shmem.c - Send/Receive routines that use shared memory, for:
* Sequent Symmetry
*
* #include "sr_doc.h"
* See sr_doc.h for detailed documentation on Send/Receive modules.
*/
#include "pcn.h"
#ifdef symmetry
/*
* We cannot include parallel/parallel.h because gcc does not
* understand the asm (inline assembler) statements.
*/
/*
#include <parallel/parallel.h>
*/
#include <signal.h>
/*
* These are the parts from parallel/parallel.h that we need...
*/
typedef unsigned char slock_t;
#define L_UNLOCKED 0
#define L_LOCKED 1
#define L_FAILED 0
#define L_SUCCESS 1
extern char *shmalloc();
#define SH_MALLOC(N) shmalloc(N)
#define SH_FREE(B) shfree(B)
typedef slock_t LOCK_T;
#define LOCK_INIT(L) s_init_lock(L)
#define UNLOCK(L) s_unlock(L)
#ifdef SPIN_LOCKS
#define LOCK(L) s_lock(L)
#else /* SPIN_LOCKS */
#define LOCK(L) sleep_lock(L)
#define DEFAULT_LOCK_SLEEP_TIME 10000
#include <sys/time.h>
static int lock_sleep_time;
static void sleep_lock(lp)
LOCK_T *lp;
{
struct timeval timeout;
while (s_clock(lp) == L_FAILED)
{
timeout.tv_sec = 0;
timeout.tv_usec = lock_sleep_time;
if (select(0,0,0,0,&timeout) < 0)
_p_fatal_error("select failed\n");
}
} /* sleep_lock() */
#endif /* SPIN_LOCKS */
#endif /* symmetry */
typedef struct msg_buf_header_struct
{
struct msg_buf_header_struct * next;
int_t size;
int_t type;
int_t node;
cell_t user_buf[1];
} msg_buf_header_t;
#define HEADER_SIZE 4
#define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE)
#define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \
- HEADER_SIZE))
typedef struct {
LOCK_T mon_lock;
LOCK_T mon_wait;
} monitor_t;
typedef struct {
msg_buf_header_t *front, *back;
int_t num;
monitor_t *m;
} sr_monitor_t;
/* This what the symmetry used to use
shared sr_monitor_t *msg_queues[MAX_NODES];
*/
static sr_monitor_t **msg_queues;
static sr_monitor_t *my_q;
/* Monitor primitives */
#define MENTER(M) LOCK(&(M->mon_lock));
#define MEXIT(M) UNLOCK(&(M->mon_lock));
#define MWAIT(M) \
{ \
UNLOCK(&(M->mon_lock)); \
LOCK(&(M->mon_wait)); \
LOCK(&(M->mon_lock)); \
}
#define MSIGNAL(M) UNLOCK(&(M->mon_wait));
#define DEFAULT_MSG_BUFFER_SIZE 1020
/*
* void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc)
*/
void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp)
int argc;
char **argv;
argdesc_t **argdescp;
int *n_argdescp;
{
static argdesc_t argdesc[] = {
INTEGER_ARG("n", &_p_nodes, "number of nodes"),
#ifndef SPIN_LOCKS
INTEGER_ARG("lock_sleep", &lock_sleep_time, "sleep time (in usec) between lock attempts"),
#endif
};
#ifndef SPIN_LOCKS
lock_sleep_time = DEFAULT_LOCK_SLEEP_TIME;
#endif
*argdescp = argdesc;
*n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]);
} /* _p_sr_get_argdesc() */
/*
* void sr_fatal_error(char *msg)
*/
static void sr_fatal_error(msg)
char *msg;
{
if (_p_my_id == -1) /* Node doesn't have an id yet */
{
fprintf(stderr,
"Fatal error: Node ???: Failed to create workers: %s\n",
msg);
}
else
{
fprintf(stderr,
"Fatal error: Node %d: Failed to create workers: %s\n",
_p_my_id, msg);
}
kill(0, SIGKILL); /* Kill the entire process group */
exit(1);
} /* sr_fatal_error() */
/*
* void _p_sr_init_node()
*/
void _p_sr_init_node(argc, argv)
int argc;
char *argv[];
{
sr_monitor_t *q;
int_t i;
int pid;
_p_my_id =_p_host_id = 0;
_p_host = TRUE;
if (_p_nodes > 1)
{
/*
* Allocated shared memory for the array of monitor pointers.
*/
msg_queues = (sr_monitor_t **) SH_MALLOC(sizeof(sr_monitor_t *)
* _p_nodes);
if (msg_queues == (sr_monitor_t **) NULL)
{
sr_fatal_error("Failed to malloc shared memory for monitor table");
}
/*
* Allocated shared memory for and initialize each node's monitor
*/
for (i = 0; i < _p_nodes; i++)
{
if ((q = (sr_monitor_t *) SH_MALLOC(sizeof(sr_monitor_t)))
== (sr_monitor_t *) NULL)
{
sr_fatal_error("Failed to initialize monitor");
}
msg_queues[i] = q;
q->front = q->back = (msg_buf_header_t *) NULL;
q->num = 0;
if ((q->m = (monitor_t *) SH_MALLOC(sizeof(monitor_t)))
== (monitor_t *) NULL)
{
sr_fatal_error("Failed to initialize monitor");
}
LOCK_INIT(&(q->m->mon_lock));
LOCK_INIT(&(q->m->mon_wait));
LOCK(&(q->m->mon_wait));
}
for (i = 1; i <= _p_nodes; i++)
{
#ifdef DEBUG
if (ParDebug(9))
printf("Forking off a worker node\n");
#endif DEBUG
if ((pid = fork()) < 0) /* error */
{
sr_fatal_error("Failed fork()");
}
else if (pid == 0) /* child */
{
_p_host = FALSE;
_p_my_id = i;
break;
}
}
my_q = msg_queues[_p_my_id];
/*
* Do any other send/receive initialization stuff here...
*/
_p_default_msg_buffer_size = DEFAULT_MSG_BUFFER_SIZE;
}
/* Finish node initialization and run emulator */
_p_init_node(argc, argv);
} /* _p_sr_init_node() */
/*
* void _p_sr_node_initialized()
*/
void _p_sr_node_initialized()
{
} /* _p_sr_node_initialized() */
/*
* void _p_destroy_nodes()
*/
void _p_destroy_nodes()
{
}
/*
* void _p_abort_nodes()
*/
void _p_abort_nodes()
{
kill(0, SIGINT); /* Kill the entire process group */
/*
kill(0, SIGKILL);
*/
} /* _p_abort_nodes() */
/*
* cell_t *_p_alloc_msg_buffer(int size)
*/
cell_t *_p_alloc_msg_buffer(size)
int size; /* In cells */
{
int i;
msg_buf_header_t *msg_buf;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_alloc_msg_buffer: Allocating %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
i = MsgSize2Bytes(size);
if ((msg_buf = (msg_buf_header_t *) SH_MALLOC(i))
== (msg_buf_header_t *) NULL)
{
_p_fatal_error("Failed to allocate a message buffer");
}
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_alloc_msg_buffer: Allocated %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (msg_buf->user_buf);
} /* _p_alloc_msg_buffer() */
/*
* void _p_msg_send(cell_t *buf, int node, int size, int type)
*/
void _p_msg_send(buf, node, size, type)
cell_t *buf;
int node;
int size; /* in cells */
int type;
{
sr_monitor_t *q = msg_queues[node];
msg_buf_header_t *msg_buf;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node, (unsigned long) size,
(unsigned long) type);
fflush(_p_stdout);
}
#endif /* DEBUG */
#ifdef DEBUG
if (node == _p_my_id)
_p_fatal_error("Internal error: Cannot send message to myself!");
#endif /* DEBUG */
/*
* If buf == NULL, then we have en empty message. So just allocate
* enough space for the message headers.
*/
if (buf == (cell_t *) NULL)
{
if ((msg_buf = (msg_buf_header_t *) SH_MALLOC(MsgSize2Bytes(0)))
== (msg_buf_header_t *) NULL)
{
_p_fatal_error("Failed to allocate a message buffer");
}
size = 0;
}
else
{
msg_buf = User2MsgBuf(buf);
}
msg_buf->size = size;
msg_buf->type = type;
msg_buf->node = _p_my_id;
MENTER(q->m);
if (q->num == 0)
{
q->front = q->back = msg_buf;
msg_buf->next = (msg_buf_header_t *) NULL;
}
else
{
q->back->next = msg_buf;
msg_buf->next = (msg_buf_header_t *) NULL;
q->back = msg_buf;
}
q->num++;
MSIGNAL(q->m);
MEXIT(q->m);
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout, "(%lu,%lu) _p_msg_send: Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* _p_msg_send() */
/*
* bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type)
*/
bool_t _p_msg_receive(node, size, type, rcv_type)
int *node;
int *size;
int *type;
int rcv_type;
{
msg_buf_header_t *msg_buf, *last_buf, *tmp_msg_buf;
bool_t done = FALSE;
int_t last_num;
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_msg_receive: Receiving message, type %lu, num=%lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) rcv_type, (unsigned long) my_q->num);
fflush(_p_stdout);
}
#endif /* DEBUG */
MENTER(my_q->m);
switch (rcv_type)
{
case RCV_NOBLOCK:
if (my_q->num == 0)
{
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_msg_receive: Non-blocking return\n",
(unsigned long) _p_my_id,
(unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
MEXIT(my_q->m);
return (FALSE);
}
msg_buf = my_q->front;
my_q->front = msg_buf->next;
break;
case RCV_BLOCK:
while(my_q->num == 0)
MWAIT(my_q->m);
msg_buf = my_q->front;
my_q->front = msg_buf->next;
break;
case RCV_PARAMS:
case RCV_GAUGE:
last_num = 0;
last_buf = (msg_buf_header_t *) NULL;
while (!done)
{
while(my_q->num == last_num)
MWAIT(my_q->m);
last_num = my_q->num;
if (last_buf == (msg_buf_header_t *) NULL)
msg_buf = my_q->front;
else
msg_buf = last_buf->next;
while (msg_buf != (msg_buf_header_t *) NULL)
{
if ( (rcv_type == RCV_PARAMS && msg_buf->type == MSG_PARAMS)
|| (rcv_type == RCV_GAUGE && msg_buf->type == MSG_GAUGE)
|| msg_buf->type == MSG_INITIATE_EXIT
|| msg_buf->type == MSG_EXIT )
{
if (last_buf == (msg_buf_header_t *) NULL)
my_q->front = msg_buf->next;
else
last_buf->next = msg_buf->next;
if (my_q->back == msg_buf) /* reset queue back */
my_q->back = last_buf;
done = TRUE;
break;
}
else
{
last_buf = msg_buf;
msg_buf = msg_buf->next;
}
}
}
break;
case RCV_COLLECT:
last_num = 0;
last_buf = (msg_buf_header_t *) NULL;
while (!done)
{
while(my_q->num == last_num)
MWAIT(my_q->m);
last_num = my_q->num;
if (last_buf == (msg_buf_header_t *) NULL)
msg_buf = my_q->front;
else
msg_buf = last_buf->next;
while (msg_buf != (msg_buf_header_t *) NULL)
{
if ( msg_buf->type == MSG_READ
|| msg_buf->type == MSG_CANCEL
|| msg_buf->type == MSG_INITIATE_EXIT
|| msg_buf->type == MSG_EXIT)
{
/* Return these types of messages */
if (last_buf == (msg_buf_header_t *) NULL)
my_q->front = msg_buf->next;
else
last_buf->next = msg_buf->next;
if (my_q->back == msg_buf) /* reset queue back */
my_q->back = last_buf;
done = TRUE;
break;
}
else if (msg_buf->type == MSG_COLLECT)
{
/* Remove these messages from the queue */
if (last_buf == (msg_buf_header_t *) NULL)
my_q->front = msg_buf->next;
else
last_buf->next = msg_buf->next;
if (my_q->back == msg_buf) /* reset queue back */
my_q->back = last_buf;
tmp_msg_buf = msg_buf->next;
SH_FREE(msg_buf);
msg_buf = tmp_msg_buf;
}
else
{
/*
* MSG_DEFINE || MSG_VALUE || MSG_GAUGE -- so leave
* it on the queue an keep searching
*/
last_buf = msg_buf;
msg_buf = msg_buf->next;
}
}
}
break;
}
my_q->num--;
MEXIT(my_q->m);
*size = msg_buf->size;
*node = msg_buf->node;
*type = msg_buf->type;
/*
* Now copy the message onto the heap. If there is not enough room
* for it, then garbage collect.
* Assume that there will always be enough room for MSG_CANCEL message.
*/
if (*type != MSG_CANCEL)
{
TryGCWithSize(*size);
}
#ifdef PCN_ALIGN_DOUBLES
if (DoubleAligned(_p_heap_ptr))
_p_heap_ptr++;
#endif /* PCN_ALIGN_DOUBLES */
memcpy(_p_heap_ptr, (char *) msg_buf->user_buf, *size * CELL_SIZE);
SH_FREE(msg_buf);
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) *node, (unsigned long) *size,
(unsigned long) *type);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (TRUE);
} /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.