This is sr_nx.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* sr_nx.c - Send/Receive routines built on the Intel nX node operating
* system. This supports the Intel iPSC/860 and
* the Intel Delta.
*
* #include "sr_doc.h"
* See sr_doc.h for detailed documentation on Send/Receive modules.
*/
#include "pcn.h"
/* These are included from gauge.h *
#ifdef delta
#include <mesh.h>
#else
#include <cube.h>
#endif
*/
#define DEFAULT_MSG_BUFFER_SIZE 1020
#define PCN_PID 0
typedef struct msg_buf_header_struct
{
struct msg_buf_header_struct * next;
int_t size;
int_t type;
int_t node;
cell_t user_buf[1];
} msg_buf_header_t;
#define HEADER_SIZE 4
#define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE)
static msg_buf_header_t *mq_front, *mq_back;
/*
* void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc)
*/
void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp)
int argc;
char **argv;
argdesc_t **argdescp;
int *n_argdescp;
{
*argdescp = (argdesc_t *) 0;
*n_argdescp = 0;
} /* _p_sr_get_argdesc() */
/*
* void sr_fatal_error(char *msg)
*/
static void sr_fatal_error(msg)
char *msg;
{
if (_p_my_id == -1) /* Node doesn't have an id yet */
{
fprintf(stderr,
"Fatal error: Node ???: Failed to create workers: %s\n",
msg);
}
else
{
fprintf(stderr,
"Fatal error: Node %lu: Failed to create workers: %s\n",
(unsigned long) _p_my_id, msg);
}
killcube(-1,-1);
exit(1);
} /* sr_fatal_error() */
/*
* void _p_sr_init_node()
*/
void _p_sr_init_node(argc, argv)
int argc;
char *argv[];
{
_p_nodes = numnodes();
_p_host_id = 0;
_p_my_id = mynode();
if (_p_my_id == _p_host_id)
_p_host = TRUE;
else
_p_host = FALSE;
if(_p_nodes > MAX_NODES)
{
if (_p_host)
{
char buf[256];
sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
sr_fatal_error(buf);
}
else
{
exit(1);
}
}
/*
* Do any other send/receive initialization stuff here...
*/
_p_default_msg_buffer_size = DEFAULT_MSG_BUFFER_SIZE;
mq_front = mq_back = (msg_buf_header_t *) NULL;
/* Finish node initialization and run emulator */
_p_init_node(argc, argv);
} /* _p_sr_init_node() */
/*
* void _p_sr_node_initialized()
*/
void _p_sr_node_initialized()
{
#ifdef DONT_INCLUDE
int msg = 42;
int i;
/* Test by sending something around the ring */
if (_p_multiprocessing)
{
if (_p_host)
{
fprintf(_p_stdout, "Host: Sending test\n");
i = (_p_my_id + 1) % _p_nodes;
fprintf(_p_stdout, "Host: Sending test to node %lu\n",
(unsigned long) i);
csend(0, &msg, 4, i, PCN_PID);
fprintf(_p_stdout, "Host: Sent test to node %lu\n",
(unsigned long) i);
crecv(0, &msg, 4);
fprintf(_p_stdout, "Host: Received test (%lu)\n",
(unsigned long) msg);
}
else
{
crecv(0, &msg, 4);
fprintf(_p_stdout, "(%lu) Received test (%lu)\n",
(unsigned long) _p_my_id, (unsigned long) msg);
i = (_p_my_id + 1) % _p_nodes;
fprintf(_p_stdout, "(%lu) Sending test to node %lu\n",
(unsigned long) _p_my_id, (unsigned long) i);
csend(0, &msg, 4, i, PCN_PID);
fprintf(_p_stdout, "(%lu) Sent test to node %lu\n",
(unsigned long) _p_my_id, (unsigned long) i);
}
if (_p_host)
killcube(-1,-1);
fprintf(_p_stdout, "Exiting\n");
exit(0);
}
#endif DONT_INCLUDE
} /* _p_sr_node_initialized() */
/*
* void _p_destroy_nodes()
*/
void _p_destroy_nodes()
{
}
/*
* void _p_abort_nodes()
*/
void _p_abort_nodes()
{
if (_p_host)
{
killcube(-1,-1);
}
else
{
csend(MSG_ABORT, 0, 0, _p_host_id, PCN_PID);
}
} /* _p_abort_nodes() */
/*
* cell_t *_p_alloc_msg_buffer(int size)
*/
cell_t *_p_alloc_msg_buffer(size)
int size; /* In cells */
{
int i;
cell_t *msg_buf;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_alloc_msg_buffer: Allocating %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
i = size * CELL_SIZE;
if ((msg_buf = (cell_t *) malloc(i)) == (cell_t *) NULL)
_p_fatal_error("Failed to allocate a message buffer");
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_alloc_msg_buffer: Allocated %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (msg_buf);
} /* _p_alloc_msg_buffer() */
/*
* void _p_msg_send(cell_t *buf, int node, int size, int type)
*/
void _p_msg_send(buf, node, size, type)
cell_t *buf;
int node;
int size; /* in cells */
int type;
{
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) node,
(unsigned long) size, (unsigned long) type);
fflush(_p_stdout);
}
#endif /* DEBUG */
#ifdef DEBUG
if (node == _p_my_id)
_p_fatal_error("Internal error: Cannot send message to myself!");
#endif /* DEBUG */
/*
* If buf == NULL, then we have en empty message. So just allocate
* enough space for the message headers.
*/
if (buf == (cell_t *) NULL)
size = 0;
csend(type, buf, size * CELL_SIZE, node, PCN_PID);
if (buf != (cell_t *) NULL)
free(buf);
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout, "(%lu) _p_msg_send: Sent\n",
(unsigned long) _p_my_id);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* _p_send_message() */
/*
* bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type)
*/
bool_t _p_msg_receive(node, size, type, rcv_type)
int *node;
int *size;
int *type;
int rcv_type;
{
msg_buf_header_t *buf;
bool_t done = FALSE;
long in_size;
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%lu) _p_msg_receive: Receiving message, type %lu\n",
(unsigned long) _p_my_id, (unsigned long) rcv_type);
fflush(_p_stdout);
}
#endif /* DEBUG */
/* Take message off queue if appropriate */
if (mq_front != (msg_buf_header_t *) NULL
&& (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK))
{
buf = mq_front;
mq_front = mq_front->next;
*size = buf->size;
*node = buf->node;
*type = buf->type;
/*
* Now copy the message onto the heap. If there is not enough room
* for it, then garbage collect.
* Assume that there will always be enough room for MSG_CANCEL message.
*/
if (*type != MSG_CANCEL)
{
TryGCWithSize(*size);
}
#ifdef PCN_ALIGN_DOUBLES
if (DoubleAligned(_p_heap_ptr))
_p_heap_ptr++;
#endif /* PCN_ALIGN_DOUBLES */
memcpy(_p_heap_ptr, buf->user_buf, *size * CELL_SIZE);
free(buf);
}
/* else, get next message from port */
else
{
switch (rcv_type)
{
case RCV_NOBLOCK:
if (!iprobe(-1))
{
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%lu) _p_msg_receive: Non-blocking return\n",
(unsigned long) _p_my_id);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (FALSE);
}
goto RCV_THE_REST;
break;
case RCV_BLOCK:
cprobe(-1);
RCV_THE_REST:
*node = infonode();
in_size = infocount();
*size = in_size / CELL_SIZE;
*type = infotype();
if (*type != MSG_CANCEL)
{
TryGCWithSize(*size);
}
#ifdef PCN_ALIGN_DOUBLES
if (DoubleAligned(_p_heap_ptr))
_p_heap_ptr++;
#endif /* PCN_ALIGN_DOUBLES */
crecv(-1, _p_heap_ptr, in_size);
break;
case RCV_PARAMS:
case RCV_GAUGE:
while (!done)
{
cprobe(-1);
*node = infonode();
in_size = infocount();
*size = in_size / CELL_SIZE;
*type = infotype();
if ( (rcv_type == RCV_PARAMS && *type == MSG_PARAMS)
|| (rcv_type == RCV_GAUGE && *type == MSG_GAUGE)
|| *type == MSG_INITIATE_EXIT
|| *type == MSG_EXIT
|| *type == MSG_ABORT )
{
crecv(-1, _p_heap_ptr, in_size);
done = TRUE;
}
else
{
/* enqueue the message */
if((buf=(msg_buf_header_t *) malloc(MsgSize2Bytes(*size)))
== (msg_buf_header_t *) NULL)
_p_malloc_error();
buf->node = *node;
buf->size = *size;
buf->type = *type;
crecv(-1, buf->user_buf, in_size);
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = buf;
buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = buf;
mq_back = buf;
buf->next = (msg_buf_header_t *) NULL;
}
}
}
break;
case RCV_COLLECT:
while (!done)
{
cprobe(-1);
*node = infonode();
in_size = infocount();
*size = in_size / CELL_SIZE;
*type = infotype();
switch(*type)
{
case MSG_READ:
case MSG_CANCEL:
case MSG_INITIATE_EXIT:
case MSG_EXIT:
case MSG_ABORT:
crecv(-1, _p_heap_ptr, in_size);
done = TRUE;
break;
case MSG_COLLECT:
crecv(-1, _p_heap_ptr, in_size);
break;
default:
/* MSG_DEFINE || MSG_VALUE || MSG_GAUGE -- so enqueue it */
if((buf=(msg_buf_header_t *) malloc(MsgSize2Bytes(*size)))
== (msg_buf_header_t *) NULL)
_p_malloc_error();
buf->node = *node;
buf->size = *size;
buf->type = *type;
crecv(-1, buf->user_buf, in_size);
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = buf;
buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = buf;
mq_back = buf;
buf->next = (msg_buf_header_t *) NULL;
}
break;
}
}
break;
}
if (*type == MSG_ABORT)
_p_fatal_error("Received an abort message");
}
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) *node,
(unsigned long) *size, (unsigned long) *type);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (TRUE);
} /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.