ftp.nice.ch/pub/next/unix/developer/pcn.2.0.s.tar.gz#/src/runtime/sr_nx.c

This is sr_nx.c in view mode; [Download] [Up]

/*
 * PCN Abstract Machine Emulator
 * Authors:     Steve Tuecke and Ian Foster
 *              Argonne National Laboratory
 *
 * Please see the DISCLAIMER file in the top level directory of the
 * distribution regarding the provisions under which this software
 * is distributed.
 *
 * sr_nx.c  -  Send/Receive routines built on the Intel nX node operating
 *			system.  This supports the Intel iPSC/860 and
 *			the Intel Delta.
 *
 * #include "sr_doc.h"
 * See sr_doc.h for detailed documentation on Send/Receive modules.
 */

#include "pcn.h"

/* These are included from gauge.h *
#ifdef delta
#include <mesh.h>
#else
#include <cube.h>
#endif
*/

#define	DEFAULT_MSG_BUFFER_SIZE	1020
#define PCN_PID	0

typedef struct msg_buf_header_struct
{
    struct msg_buf_header_struct *	next;
    int_t				size;
    int_t				type;
    int_t				node;
    cell_t				user_buf[1];
} msg_buf_header_t;

#define HEADER_SIZE		4
#define MsgSize2Bytes(Size)	(((Size) + HEADER_SIZE) * CELL_SIZE)

static	msg_buf_header_t	*mq_front, *mq_back;


/* 
 * void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc)
 */
void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp)
int argc;
char **argv;
argdesc_t **argdescp;
int *n_argdescp;
{
    *argdescp = (argdesc_t *) 0;
    *n_argdescp = 0;
} /* _p_sr_get_argdesc() */


/*
 * void sr_fatal_error(char *msg)
 */
static void sr_fatal_error(msg)
char *msg;
{
    if (_p_my_id == -1)		/* Node doesn't have an id yet */
    {
	fprintf(stderr,
		"Fatal error: Node ???: Failed to create workers: %s\n",
		msg);
    }
    else
    {
	fprintf(stderr,
		"Fatal error: Node %lu: Failed to create workers: %s\n",
		(unsigned long) _p_my_id, msg);
    }
    killcube(-1,-1);
    exit(1);
} /* sr_fatal_error() */


/*
 * void	_p_sr_init_node()
 */
void _p_sr_init_node(argc, argv)
int argc;
char *argv[];
{
    _p_nodes = numnodes();
    _p_host_id = 0;
    _p_my_id = mynode();
    if (_p_my_id == _p_host_id)
	_p_host = TRUE;
    else
	_p_host = FALSE;

    if(_p_nodes > MAX_NODES)
    {
	if (_p_host)
	{
	    char buf[256];
	    sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
	    sr_fatal_error(buf);
	}
	else
	{
	    exit(1);
	}
    }

    /*
     * Do any other send/receive initialization stuff here...
     */
    _p_default_msg_buffer_size = DEFAULT_MSG_BUFFER_SIZE;
    mq_front = mq_back = (msg_buf_header_t *) NULL;

    /* Finish node initialization and run emulator */
    _p_init_node(argc, argv);
    
} /* _p_sr_init_node() */



/*
 * void _p_sr_node_initialized()
 */
void _p_sr_node_initialized()
{
#ifdef DONT_INCLUDE
    int msg = 42;
    int i;

    /* Test by sending something around the ring */
    if (_p_multiprocessing)
    {
	if (_p_host)
	{
	    fprintf(_p_stdout, "Host: Sending test\n");
	    i = (_p_my_id + 1) % _p_nodes;
	    fprintf(_p_stdout, "Host: Sending test to node %lu\n",
		    (unsigned long) i);
	    csend(0, &msg, 4, i, PCN_PID);
	    fprintf(_p_stdout, "Host: Sent    test to node %lu\n",
		    (unsigned long) i);
	    crecv(0, &msg, 4);
	    fprintf(_p_stdout, "Host: Received test (%lu)\n",
		    (unsigned long) msg);
	}
	
	else
	{
	    crecv(0, &msg, 4);
	    fprintf(_p_stdout, "(%lu) Received test (%lu)\n",
		    (unsigned long) _p_my_id, (unsigned long) msg);
	    i = (_p_my_id + 1) % _p_nodes;
	    fprintf(_p_stdout, "(%lu) Sending test to node %lu\n",
		    (unsigned long) _p_my_id, (unsigned long) i);
	    csend(0, &msg, 4, i, PCN_PID);
	    fprintf(_p_stdout, "(%lu) Sent    test to node %lu\n",
		    (unsigned long) _p_my_id, (unsigned long) i);
	}
	if (_p_host)
	    killcube(-1,-1);
	fprintf(_p_stdout, "Exiting\n");
	exit(0);
    }
#endif DONT_INCLUDE
} /* _p_sr_node_initialized() */


/*
 * void	_p_destroy_nodes()
 */
void _p_destroy_nodes()
{
}


/*
 * void _p_abort_nodes()
 */
void _p_abort_nodes()
{
    if (_p_host)
    {
	killcube(-1,-1);
    }
    else
    {
	csend(MSG_ABORT, 0, 0, _p_host_id, PCN_PID);
    }
} /* _p_abort_nodes() */



/*
 * cell_t *_p_alloc_msg_buffer(int size)
 */
cell_t *_p_alloc_msg_buffer(size)
int size;		/* In cells */
{
    int i;
    cell_t *msg_buf;

#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout,
		"(%lu) _p_alloc_msg_buffer: Allocating %lu cells\n",
		(unsigned long) _p_my_id, (unsigned long) size);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
    i = size * CELL_SIZE;

    if ((msg_buf = (cell_t *) malloc(i)) == (cell_t *) NULL)
	_p_fatal_error("Failed to allocate a message buffer");

#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout,
		"(%lu) _p_alloc_msg_buffer: Allocated  %lu cells\n",
		(unsigned long) _p_my_id, (unsigned long) size);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
    return (msg_buf);
    
} /* _p_alloc_msg_buffer() */


/*
 * void _p_msg_send(cell_t *buf, int node, int size, int type)
 */
void _p_msg_send(buf, node, size, type)
cell_t *buf;
int node;
int size;		/* in cells */
int type;
{
#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout,
		"(%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n",
		(unsigned long) _p_my_id, (unsigned long) node,
		(unsigned long) size, (unsigned long) type);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
#ifdef DEBUG
    if (node == _p_my_id)
	_p_fatal_error("Internal error: Cannot send message to myself!");
#endif /* DEBUG */
    
    /*
     * If buf == NULL, then we have en empty message.  So just allocate
     * enough space for the message headers.
     */
    if (buf == (cell_t *) NULL)
	size = 0;

    csend(type, buf, size * CELL_SIZE, node, PCN_PID);

    if (buf != (cell_t *) NULL)
	free(buf);
    
#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout, "(%lu) _p_msg_send: Sent\n",
		(unsigned long) _p_my_id);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
} /* _p_send_message() */


/*
 * bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type)
 */
bool_t _p_msg_receive(node, size, type, rcv_type)
int *node;
int *size;
int *type;
int rcv_type;
{
    msg_buf_header_t *buf;
    bool_t done = FALSE;
    long in_size;

#ifdef DEBUG
    if (ParDebug(9))
    {
	fprintf(_p_stdout,
		"(%lu) _p_msg_receive: Receiving message, type %lu\n",
		(unsigned long) _p_my_id, (unsigned long) rcv_type);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
    /* Take message off queue if appropriate */
    if (mq_front != (msg_buf_header_t *) NULL
	&& (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK))
    {
	buf = mq_front;
	mq_front = mq_front->next;

	*size = buf->size;
	*node = buf->node;
	*type = buf->type;

	/*
	 * Now copy the message onto the heap.  If there is not enough room
	 *   for it, then garbage collect.
	 * Assume that there will always be enough room for MSG_CANCEL message.
	 */
	if (*type != MSG_CANCEL)
	{
	    TryGCWithSize(*size);
	}
#ifdef PCN_ALIGN_DOUBLES
	if (DoubleAligned(_p_heap_ptr))
	    _p_heap_ptr++;
#endif /* PCN_ALIGN_DOUBLES */	
	memcpy(_p_heap_ptr, buf->user_buf, *size * CELL_SIZE);
	
	free(buf);
    }

    /* else, get next message from port */
    else
    {
	switch (rcv_type)
	{
	case RCV_NOBLOCK:
	    if (!iprobe(-1))
	    {
#ifdef DEBUG
		if (ParDebug(9))
		{
		    fprintf(_p_stdout,
			    "(%lu) _p_msg_receive: Non-blocking return\n",
			    (unsigned long) _p_my_id);
		    fflush(_p_stdout);
		}
#endif /* DEBUG */
		return (FALSE);
	    }
	    goto RCV_THE_REST;
	    break;
	    
	case RCV_BLOCK:
	    cprobe(-1);
	RCV_THE_REST:
	    *node = infonode();
	    in_size = infocount();
	    *size = in_size / CELL_SIZE;
	    *type = infotype();
	    if (*type != MSG_CANCEL)
	    {
		TryGCWithSize(*size);
	    }
#ifdef PCN_ALIGN_DOUBLES
	    if (DoubleAligned(_p_heap_ptr))
		_p_heap_ptr++;
#endif /* PCN_ALIGN_DOUBLES */	

	    crecv(-1, _p_heap_ptr, in_size);
	    break;
	    
	case RCV_PARAMS:
	case RCV_GAUGE:
	    while (!done)
	    {
		cprobe(-1);
		*node = infonode();
		in_size = infocount();
		*size = in_size / CELL_SIZE;
		*type = infotype();
		if (   (rcv_type == RCV_PARAMS && *type == MSG_PARAMS)
		    || (rcv_type == RCV_GAUGE  && *type == MSG_GAUGE)
		    || *type == MSG_INITIATE_EXIT
		    || *type == MSG_EXIT
		    || *type == MSG_ABORT )
		{
		    crecv(-1, _p_heap_ptr, in_size);
		    done = TRUE;
		}
		else
		{
		    /* enqueue the message */
		    if((buf=(msg_buf_header_t *) malloc(MsgSize2Bytes(*size)))
		       == (msg_buf_header_t *) NULL)
			_p_malloc_error();
		    buf->node = *node;
		    buf->size = *size;
		    buf->type = *type;
		    crecv(-1, buf->user_buf, in_size);
		    
		    if (mq_front == (msg_buf_header_t *) NULL)
		    {
			mq_front = mq_back = buf;
			buf->next = (msg_buf_header_t *) NULL;
		    }
		    else
		    {
			mq_back->next = buf;
			mq_back = buf;
			buf->next = (msg_buf_header_t *) NULL;
		    }
		}
	    }
	    break;

	case RCV_COLLECT:
	    while (!done)
	    {
		cprobe(-1);
		*node = infonode();
		in_size = infocount();
		*size = in_size / CELL_SIZE;
		*type = infotype();
		switch(*type)
		{
		case MSG_READ:
		case MSG_CANCEL:
		case MSG_INITIATE_EXIT:
		case MSG_EXIT:
		case MSG_ABORT:
		    crecv(-1, _p_heap_ptr, in_size);
		    done = TRUE;
		    break;
		case MSG_COLLECT:
		    crecv(-1, _p_heap_ptr, in_size);
		    break;
		default:
		    /* MSG_DEFINE || MSG_VALUE || MSG_GAUGE -- so enqueue it */
		    if((buf=(msg_buf_header_t *) malloc(MsgSize2Bytes(*size)))
		       == (msg_buf_header_t *) NULL)
			_p_malloc_error();
		    buf->node = *node;
		    buf->size = *size;
		    buf->type = *type;
		    crecv(-1, buf->user_buf, in_size);
		    
		    if (mq_front == (msg_buf_header_t *) NULL)
		    {
			mq_front = mq_back = buf;
			buf->next = (msg_buf_header_t *) NULL;
		    }
		    else
		    {
			mq_back->next = buf;
			mq_back = buf;
			buf->next = (msg_buf_header_t *) NULL;
		    }
		    break;
		}
	    }
	    break;
	}
	
	if (*type == MSG_ABORT)
	    _p_fatal_error("Received an abort message");
    }
    
#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout,
		"(%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n",
		(unsigned long) _p_my_id, (unsigned long) *node,
		(unsigned long) *size, (unsigned long) *type);
	fflush(_p_stdout);
    }
#endif /* DEBUG */

    return (TRUE);
    
} /* _p_msg_receive() */

These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.