This is sr_ce.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* sr_ce.c - Send/Receive routines built on the Cosmic Environment
*
* #include "sr_doc.h"
* See sr_doc.h for detailed documentation on Send/Receive modules.
*/
#include "pcn.h"
#include <cube/cubedef.h>
#define PCN_PID 42
#ifndef USEHOST
#define USEHOST 1
#endif /* USEHOST */
#define pcnnode_to_cenode(N) (N == _p_host_id ? HOST : N)
#define DEFAULT_MSG_BUFFER_SIZE 1020
typedef struct msg_buf_header_struct
{
struct msg_buf_header_struct * next;
int_t size;
int_t type;
int_t node;
cell_t user_buf[1];
} msg_buf_header_t;
#define HEADER_SIZE 4
#define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE)
#define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \
- HEADER_SIZE))
static msg_buf_header_t *mq_front, *mq_back;
static bool_t run_uniprocessing;
/*
* void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc)
*/
void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp)
int argc;
char **argv;
argdesc_t **argdescp;
int *n_argdescp;
{
#ifdef PCN_HOST
static argdesc_t argdesc[] = {
BOOL_ARG("uni", &run_uniprocessing, "run as a uniprocessing system"),
};
run_uniprocessing = FALSE;
*argdescp = argdesc;
*n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]);
#endif /* PCN_HOST */
} /* _p_sr_get_argdesc() */
/*
* void sr_fatal_error(char *msg)
*/
static void sr_fatal_error(msg)
char *msg;
{
if (_p_my_id == -1) /* Node doesn't have an id yet */
{
fprintf(stderr,
"Fatal error: Node ???: Failed to create workers: %s\n",
msg);
}
else
{
fprintf(stderr,
"Fatal error: Node %lu: Failed to create workers: %s\n",
(unsigned long) _p_my_id, msg);
}
print("Fatal error: Failed to create workers: %s", msg);
ckill(-1,-1,"");
exit(1);
} /* sr_fatal_error() */
/*
* void _p_sr_init_node()
*
* The host creates the nodes and does initialization.
* The nodes do initialization.
*/
void _p_sr_init_node()
{
#ifdef PCN_HOST
if (run_uniprocessing)
{
_p_nodes = 1;
_p_host_id = 0;
_p_usehost = TRUE;
_p_my_id = 0;
_p_host = TRUE;
/* Finish node initialization and run emulator */
_p_init_node();
return;
}
cosmic_init(HOST, PCN_PID);
if (mynode() == HOST)
{
if (spawn(_p_exe_file, -1, PCN_PID, "") != 0)
{
sr_fatal_error("Failed spawn()");
}
_p_host_id = nnodes();
_p_nodes = _p_host_id + 1;
_p_my_id = _p_host_id;
_p_host = TRUE;
if(_p_nodes > MAX_NODES)
{
char buf[256];
sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
sr_fatal_error(buf);
}
}
else
{
sr_fatal_error("This code is compiled as a CE host");
}
#endif /* PCN_HOST */
#ifdef PCN_NODE
_p_host_id = nnodes();
_p_nodes = _p_host_id + 1;
_p_my_id = mynode();
_p_host = FALSE;
#endif PCN_NODE
#if USEHOST == 1
_p_usehost = TRUE;
#else
_p_usehost = FALSE;
#endif /* USEHOST */
/*
* Do any other send/receive initialization stuff here...
*/
_p_default_msg_buffer_size = DEFAULT_MSG_BUFFER_SIZE;
mq_front = mq_back = (msg_buf_header_t *) NULL;
/* Finish node initialization and run emulator */
_p_init_node();
} /* _p_sr_init_node() */
/*
* void _p_sr_node_initialized()
*/
void _p_sr_node_initialized()
{
#ifdef DONT_INCLUDE
int *msg;
int i;
/* Test by sending something around the ring */
if (_p_host)
{
print("Host: Sending test");
i = (_p_my_id+1) % _p_nodes;
if ((msg = (int *) xmalloc(sizeof(int))) == (int *) NULL)
{
_p_fatal_error("Failed xmalloc()");
}
*msg = 42;
print("Host: Sending test to node %lu", i);
xsend(msg, pcnnode_to_cenode(i), PCN_PID);
print("Host: Sent test to node %lu", i);
msg = (int *) xrecvb();
print("Host: Received test (%lu)", *msg);
xfree(msg);
}
else
{
msg = (int *) xrecvb();
print("(%lu) Received test (%lu)", _p_my_id, *msg);
i = (_p_my_id+1) % _p_nodes;
print("(%lu) Sending test to node %lu", _p_my_id, i);
xsend(msg, pcnnode_to_cenode(i), PCN_PID);
print("(%lu) Sent test to node %lu", _p_my_id, i);
}
if (_p_host)
ckill(-1,-1,"");
print("Exiting");
exit(0);
#endif DONT_INCLUDE
} /* _p_sr_node_initialized() */
/*
* void _p_destroy_nodes()
*/
void _p_destroy_nodes()
{
if (_p_host)
{
ckill(-1,-1,"");
exit(0);
}
}
/*
* void _p_abort_nodes()
*/
void _p_abort_nodes()
{
int i;
msg_buf_header_t *msg_buf;
if (_p_host)
{
ckill(-1,-1,"");
}
else
{
if ((msg_buf = (msg_buf_header_t *) xmalloc(4))
== (msg_buf_header_t *) NULL)
{
ckill(-1,-1,"");
}
msg_buf->size = 0;
msg_buf->type = MSG_ABORT;
msg_buf->node = _p_my_id;
xsend(msg_buf, HOST, PCN_PID);
}
} /* _p_abort_nodes() */
/*
* msg_buf_header_t *_p_alloc_msg_buffer(int size)
*/
cell_t *_p_alloc_msg_buffer(size)
int size; /* In cells */
{
int i;
msg_buf_header_t *msg_buf;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_alloc_msg_buffer: Allocating %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
i = MsgSize2Bytes(size);
if ((msg_buf = (msg_buf_header_t *) xmalloc(i))
== (msg_buf_header_t *) NULL)
{
_p_fatal_error("Failed to allocate a message buffer");
}
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_alloc_msg_buffer: Allocated %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (msg_buf->user_buf);
} /* _p_alloc_msg_buffer() */
/*
* void _p_msg_send(cell_t *buf, int node, int size, int type)
*/
void _p_msg_send(buf, node, size, type)
cell_t *buf;
int node;
int size; /* in cells */
int type;
{
msg_buf_header_t *send_buf;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) node,
(unsigned long) size, (unsigned long) type);
fflush(_p_stdout);
}
#endif /* DEBUG */
#ifdef DEBUG
if (node == _p_my_id)
_p_fatal_error("Internal error: Cannot send message to myself!");
#endif /* DEBUG */
/*
* If buf == NULL, then we have an empty message. So just allocate
* enough space for the message headers.
*/
if (buf == (cell_t *) NULL)
{
if ((send_buf = (msg_buf_header_t *) xmalloc(MsgSize2Bytes(0)))
== (msg_buf_header_t *) NULL)
{
_p_fatal_error("Failed to allocate a message buffer");
}
size = 0;
}
else
{
send_buf = User2MsgBuf(buf);
}
send_buf->size = size;
send_buf->type = type;
send_buf->node = _p_my_id;
xsend(send_buf, pcnnode_to_cenode(node), PCN_PID);
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout, "(%lu) _p_msg_send: Sent\n",
(unsigned long) _p_my_id);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* _p_send_message() */
/*
* bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type)
*/
bool_t _p_msg_receive(node, size, type, rcv_type)
int *node;
int *size;
int *type;
int rcv_type;
{
msg_buf_header_t *msg_buf;
bool_t done = FALSE;
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%lu) _p_msg_receive: Receiving message, type %lu\n",
(unsigned long) _p_my_id, (unsigned long) rcv_type);
fflush(_p_stdout);
}
#endif /* DEBUG */
/* Take message off queue if appropriate */
if (mq_front != (msg_buf_header_t *) NULL
&& (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK))
{
msg_buf = mq_front;
mq_front = mq_front->next;
}
/* else, get next message from system */
else
{
switch (rcv_type)
{
case RCV_NOBLOCK:
if ((msg_buf = (msg_buf_header_t *) xrecv())
== (msg_buf_header_t *) NULL)
{
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%lu) _p_msg_receive: Non-blocking return\n",
(unsigned long) _p_my_id);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (FALSE);
}
break;
case RCV_BLOCK:
if ((msg_buf = (msg_buf_header_t *) xrecvb())
== (msg_buf_header_t *) NULL)
{
_p_fatal_error("Failed on blocking receive");
}
break;
case RCV_PARAMS:
for (done = FALSE; !done; )
{
if ((msg_buf = (msg_buf_header_t *) xrecvb())
== (msg_buf_header_t *) NULL)
{
_p_fatal_error("Failed on blocking receive");
}
if ( msg_buf->type == MSG_PARAMS
|| msg_buf->type == MSG_EXIT )
{
done = TRUE;
}
else
{
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = msg_buf;
msg_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = msg_buf;
mq_back = msg_buf;
msg_buf->next = (msg_buf_header_t *) NULL;
}
}
}
break;
case RCV_COLLECT:
while (!done)
{
if ((msg_buf = (msg_buf_header_t *) xrecvb())
== (msg_buf_header_t *) NULL)
{
_p_fatal_error("Failed on blocking receive");
}
switch(msg_buf->type)
{
case MSG_ABORT:
case MSG_EXIT:
case MSG_READ:
case MSG_CANCEL:
done = TRUE;
break;
case MSG_COLLECT:
xfree(msg_buf);
break;
case MSG_DEFINE:
case MSG_VALUE:
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = msg_buf;
msg_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = msg_buf;
mq_back = msg_buf;
msg_buf->next = (msg_buf_header_t *) NULL;
}
break;
default:
_p_fatal_error("_p_msg_receive(): Illegal message type");
}
}
break;
}
if (msg_buf->type == MSG_ABORT)
{
_p_fatal_error("Received an abort message");
}
}
*size = msg_buf->size;
*node = msg_buf->node;
*type = msg_buf->type;
/*
* Now copy the message onto the heap. If there is not enough room
* for it, then garbage collect.
* Assume that there will always be enough room for MSG_CANCEL message.
*/
if (*type != MSG_CANCEL)
{
TryGCWithSize(*size);
}
#ifdef PCN_ALIGN_DOUBLES
if (DoubleAligned(_p_heap_ptr))
_p_heap_ptr++;
#endif /* PCN_ALIGN_DOUBLES */
memcpy(_p_heap_ptr, (char *) msg_buf->user_buf, *size * CELL_SIZE);
xfree(msg_buf);
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) *node,
(unsigned long) *size, (unsigned long) *type);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (TRUE);
} /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.