ftp.nice.ch/pub/next/unix/developer/pcn.2.0.s.tar.gz#/src/runtime/sr_bsdipc.c

This is sr_bsdipc.c in view mode; [Download] [Up]

/*
 * PCN Abstract Machine Emulator
 * Authors:     Steve Tuecke and Ian Foster and Robert Olson
 *              Argonne National Laboratory
 *
 * Please see the DISCLAIMER file in the top level directory of the
 * distribution regarding the provisions under which this software
 * is distributed.
 *
 * sr_bsdipc.c  -  send/receive header using BSD Unix IPC (TCP sockets)
 */

#include <sys/types.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/wait.h>
#include <netinet/tcp.h>
#include <sys/resource.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <sys/param.h>

#ifdef rs6000
#include <sys/select.h>
#endif

#include "pcn.h"

/* 
 * We need to determine if the Unix we're on uses "struct direct" or 
 * "struct dirent" for directory entries. So far, the NeXT is the only 
 * one that uses "struct direct".
 */
#if defined(next_os) || defined(symmetry)
#define USE_DIRECT
#else
#define USE_DIRENT
#endif

#if defined(USE_DIRECT)
#include <sys/dir.h>
#else
#if defined(USE_DIRENT)
#include <dirent.h>
#endif
#endif

#include <sys/file.h>

/* 
 * For the access() arguments
 */
#if !(defined(sequent) || defined(next_os))
#include <unistd.h>
#endif

/* 
 * The OPTIMAL_TCP_BLOCKSIZE is the block size that I've found 
 * produces the highest throughput (probably not incidentally, this is 
 * the number of bytes written at each write() to a nonblocking 
 * socket). 
 * 
 * TCP (on sparcstations at least) seems to have a problem with 
 * sending block sizes in range from 4097 to about 6000 bytes. 
 * Defining TCP_BUMP_HACK (named from the bump in the plot of 
 * elapsed time for send versus message size) forces messages in this 
 * region to be forced to a larger size, actually increasing the 
 * performance. The endpoints of the bump are defined as TCP_BUMP_MIN 
 * and TCP_BUMP_MAX, defaulting to OPTIMAL_TCP_BLOCKSIZE and 
 * OPTIMAL_TCP_BLOCKSIZE + 2048. 
 * 
 * These parameters will probably have to be tuned for different 
 * TCP/IP implementations.
 * 
 */
#ifndef OPTIMAL_TCP_BLOCKSIZE
#define OPTIMAL_TCP_BLOCKSIZE 4096
#endif

#ifdef TCP_BUMP_HACK
#ifndef TCP_BUMP_MIN
#define TCP_BUMP_MIN OPTIMAL_TCP_BLOCKSIZE
#endif
#ifndef TCP_BUMP_MAX
#define TCP_BUMP_MAX (TCP_BUMP_MIN + 2048)
#endif
#endif

#ifdef hpux
/* Empirically determined... */
#define NUM_FDS 60
#else
/* This works for BSD machines */
#define NUM_FDS getdtablesize()
#endif

#define SYSTEM_ERROR (sys_errlist[errno])

#define FATAL_SYSTEM_ERROR(s) { char buf[1024]; \
				    sprintf(buf, s, sys_errlist[errno]); \
				    _p_fatal_error(buf);  }

#define ME _p_my_id

extern int errno;
extern char *sys_errlist[];

extern char *getenv();
    
/* 
 * Process table:
 *     Contains host and port number (used during creation) for each node
 *     as well as the file descriptor for the connection to that node. 
 *     
 *     The fd for the entry for the current process is undefined.
 *     
 *     buf is a pointer to the current incomplete message.
 *     len is the number of bytes left to read.
 */

typedef struct _proc_table_entry
{
    int port;
    int fd;
    char host[256];
    char *buf, *bufend;
    int len;
} proc_table_entry;

static proc_table_entry proc_table[MAX_NODES];

/* 
 * Argument parsing stuff.
 * 
 * The variables are labeled _p even though they are static because I 
 * didn't want to change them all...
 * 
 * parallel_startup:
 * 	name of startup file
 * 	
 * parallel_info:
 * 	string of the form "host,port" which denotes the hostname of 
 * 	the PCN host node, and the port on which it is listening.
 * 	
 * dont_start_remotes:
 * 	if true, do not actually execute the system() to start remote 
 * 	nodes specfied in the startup file.
 * 	
 * network_debugging:
 * 	if true, print out copious debugging message for the low level 
 * 	send and receive.
 * 	
 * use_start_nodes:
 * 	attempt to connect to the host-control daemon to start worker nodes.
 * 	
 * hostconfig_file:
 * 	name of a file containing host-control configuration 
 * 	information. See the host-control documentation for more details.
 */

static	char	parallel_startup[MAX_PATH_LENGTH];
static	char	parallel_info[MAX_PATH_LENGTH];

static	bool_t	dont_start_remotes = 0;
#ifdef DEBUG
static	bool_t	network_debugging = 0;
#endif
static	bool_t	use_start_nodes = 0;
static	char	hostconfig_file[MAX_PATH_LENGTH];
static bool_t 	start_heterogeneous;
static char	host_startup_list[1000];

static char *exe_from_argv; /* For saving argv[0] to give to host-control */

/* 
 * my_port is the TCP port number for the port on which the host 
 * listens for connections from the worker nodes.
 * 
 * listen_fd is the associated file descriptor.
 */
static int my_port;
static int listen_fd;

static bool_t destroying_nodes = FALSE;

typedef struct msg_buf_header_struct
{
    struct msg_buf_header_struct *	next;
#ifdef TCP_BUMP_HACK
    int_t				alloc_size;
#endif
    int_t				size;
    int_t				type;
    int_t				node;
    cell_t				user_buf[1];
} msg_buf_header_t;


#ifdef TCP_BUMP_HACK
#define HEADER_SIZE 5
#else
#define HEADER_SIZE 4
#endif

#define MsgSize2Bytes(Size)	(((Size) + HEADER_SIZE) * CELL_SIZE)
#define	User2MsgBuf(Buf)	((msg_buf_header_t *) (((cell_t *) (Buf)) \
						       - HEADER_SIZE))
#define MsgBuf2User(Buf)      ((msg_buf_header_t *) (((cell_t *) (Buf)) \
						       + HEADER_SIZE))

static msg_buf_header_t *mq_front, *mq_back;


#ifdef STREAMS
/*
 * Each element of the stream_table contains a pointer to the front
 * of a stream array on the heap, and the index of this table entry
 * into that stream array.
 *
 * The sindex field of the stream_table is also used to maintain a
 * free list of stream_table entries, where first_free_stream_id
 * contains the index of the first free stream table entry.
 *
 * When there is a pending receive on a stream, the 'enabled' field is
 * set to TRUE.  Whenever a stream message arrives, _p_msg_receive()
 * needs to check this field to see if it can deliver the message.
 * If enabled==FALSE, then the message is queued on this stream's
 * message queue (mq_front, mq_back).
 *
 * A receiving side of a stream is marked closed when:
 *	- closed==TRUE
 *	- and, the message queue is empty
 */
static struct
{
    cell_t *		stream_array;
    int_t		sindex;
    bool_t		enabled;
    bool_t		closed;
    msg_buf_header_t *	mq_front;
    msg_buf_header_t *	mq_back;
} stream_table[MAX_STREAMS];
static int_t	max_streams = MAX_STREAMS;
static int_t	first_free_stream_id;
static void	handle_close_stream();
static void	handle_stream_receive();
static void	deliver_stream_msg();
#endif /* STREAMS */

/* 
 * A worker_ctl_msg_t is sent between the host and workers during 
 * system initialization. 
 * 
 * Message types:
 * 
 * WORKER_CTL_INIT: 
 * 	i1 = node id
 * 	i2 = number of nodes
 * 	Sent from host to worker
 * 	
 * WORKER_CTL_PORT
 * 	i1 = port number on which worker is listening
 * 	buf = hostname of worker
 * 	Sent from worker to host
 * 	
 * WORKER_CTL_PROCTBL_ENTRY
 * 	A process table entry.
 * 	i1 = node number
 * 	i2 = port on which node i1 is listening
 * 	buf = hostname for node i1
 * 	
 * WORKER_CTL_PROCTBL_END
 * 	Denotes the end of the process table.
 * 	
 * WORKER_CTL_START_CONNECTING
 * 	Sent from the host to a worker to instruct the worker to 
 * 	connect to all other workers with node numbers greater than 
 * 	its own.
 * 	
 * WORKER_CTL_CONNECTIONS_MADE
 * 	Sent from a worker to the host when it has finished making its 
 * 	connections. 
 * 	
 * WORKER_CTL_CONN_TEST
 * 	This is the first message sent on a new connection. It is used 
 * 	to test the connection.
 * 	
 * WORKER_CTL_CONN_ACK
 * 	Response to a WORKER_CTL_CONN_TEST message.
 * 	
 * WORKER_CTL_GO
 * 	Sent from the host to a worker to instruct the worker to begin 
 * 	executing PCN.
 * 	
 * WORKER_CTL_GO_ACK
 * 	Response to a WORKER_CTL_GO message.
 * 	
 * WORKER_CTL_CHECKIN
 * 	Sent from the host to a worker during the final node checking 
 * 	in _p_sr_node_initialized().
 * 	
 * WORKER_CTL_CHECKIN_ACK
 * 	Response to a WORKER_CTL_CHECKIN message.
 *
 * WORKER_CTL_EXEC_FAILED
 * 	Sent by host-control when it discovers that a node couldn't be 
 * 	started. 
 */

typedef struct worker_ctl_msg_struct
{
    int type;
    int i1, i2, i3;
    char buf[256];
} worker_ctl_msg_t;

#define WORKER_CTL_INIT	1
#define WORKER_CTL_PORT	2
#define WORKER_CTL_PROCTBL_ENTRY 	3
#define WORKER_CTL_PROCTBL_END   	4
#define WORKER_CTL_START_CONNECTING 	5
#define WORKER_CTL_CONNECTIONS_MADE 	6
#define WORKER_CTL_CONN_TEST		7
#define WORKER_CTL_CONN_ACK		8
#define WORKER_CTL_GO			9
#define WORKER_CTL_GO_ACK		10
#define WORKER_CTL_CHECKIN		11
#define WORKER_CTL_CHECKIN_ACK		12
#define WORKER_CTL_EXEC_FAILED		13

static void net_setup_anon_listener();
static int net_accept()	;
static int net_conn_to_listener();
static void error_check();
static int blocking_read();
static int blocking_write();
static void blocking_read_sr_errchk();
static void blocking_write_sr_errchk();
static void blocking_read_errchk();
static void blocking_write_errchk();
static void set_nonblocking();

static void sr_fatal_error();
static void sr_fatal_error_1();
static void start_child();
static void start_nodes();
static int start_nodes_with_daemon();
static int start_nodes_with_script();
static void start_nodes_from_list();
static void start_node_on_host();
static void read_line_from_fd();
static void write_line_to_fd();
static void find_host_daemon();
static void read_startup_file();
static void start_host();
static void process_connection();
static void start_worker();
static void set_mask_for_all_workers();
#ifdef DEBUG
static void dump_proc_table();
#endif /* DEBUG */
static void accept_connections();
static void send_proc_table();
static void receive_proc_table();
static void await_connections();
static void make_connections();
static void process_connection_from_worker();
static void process_mesg_from_node();
static bool_t process_mesg_from_host();
static int node_from_fd();
static void await_go();
static char *ident();

/* 
 * _p_sr_get_argdesc()
 */
void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp)
int argc;
char **argv;
argdesc_t **argdescp;
int *n_argdescp;
{
    static argdesc_t argdesc[] = {
	STRING_ARG("nodes", host_startup_list,
		   "List of machine names on which to run PCN nodes"),
	STRING_ARG("s", parallel_startup,
		   "system specific parallel startup file"),
	BOOL_ARG("start-nodes", &use_start_nodes,
		 "try to use the host-control daemon to start nodes"),
	INTEGER_ARG("n", &_p_nodes,
		    "number of nodes"),
	STRING_ARG("pi", parallel_info,
		   "parallel info for node processes -- you should never use this"),
	STRING_ARG("hostconfig", hostconfig_file,
		   "host configuration file"),
	BOOL_ARG("nostart", &dont_start_remotes,
		 "don't actually start remote nodes"),
	BOOL_ARG("het", &start_heterogeneous,
		 "allow a heterogeneous network"),
#ifdef DEBUG    
	BOOL_ARG("netdebug", &network_debugging,
		 "turn on network debugging\n"),
#endif
    };

    exe_from_argv = argv[0];
    
    parallel_startup[0] = '\0';
    parallel_info[0] = '\0';

    dont_start_remotes = FALSE;
#ifdef DEBUG
    network_debugging = FALSE;
#endif
    use_start_nodes = FALSE;
    start_heterogeneous = FALSE;
    hostconfig_file[0] = '\0';

    host_startup_list[0] = '\0';

    *argdescp = argdesc;
    *n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]);
}


/*
 * _p_alloc_msg_buffer()
 * 
 * There is some trickery here to support the TCP_BUMP_HACK. If the 
 * message size falls in the bump region (TCP_BUMP_MIN < size < TCP_BUMP_MAX)
 * then actually allocate TCP_BUMP_MAX bytes and fill in the allocated 
 * size in msg_buf->alloc_size. Otherwise let msg_buf->alloc_size = -1.
 * 
 * Return:	A pointer to a message buffer with 'size' cells.
 */
cell_t *_p_alloc_msg_buffer(size)
int_t size;		/* In cells */
{
    int_t i;
    msg_buf_header_t *msg_buf;
#ifdef TCP_BUMP_HACK
    bool_t resize = FALSE;
#endif
    
#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout,
		"(%lu,%lu) _p_alloc_msg_buffer: Allocating %lu cells\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(unsigned long) size);
	fflush(_p_stdout);
    }
#endif /* DEBUG */

    i = MsgSize2Bytes(size);

#ifdef TCP_BUMP_HACK
    if (i > TCP_BUMP_MIN && i < TCP_BUMP_MAX)
    {
	/* Here's the bad area. Allocate TCP_BUMP_MAX bytes instead (and
	 * actually send them!
	 */

	i = TCP_BUMP_MAX;
	resize = TRUE;
    }
#endif

    if ((msg_buf = (msg_buf_header_t *) malloc((size_t) i)) == NULL)
	_p_malloc_error();

#ifdef TCP_BUMP_HACK    
    msg_buf->alloc_size = resize ? i : -1;
#endif

    msg_buf->size = size;
    
#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout,
		"(%lu,%lu) _p_alloc_msg_buffer: Allocated %lu cells\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(unsigned long) size);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
    return (msg_buf->user_buf);
    
} /* _p_alloc_msg_buffer() */


#define FreeMsgBuf(Buf) free((char *) Buf)


/*
 * _p_abort_nodes()
 */
void _p_abort_nodes()
{
    int_t i;
    for (i = 0; i < _p_nodes; i++)
    {
	if (i != _p_my_id)
	{
	    close(proc_table[i].fd);
	}
    }
} /* _p_abort_nodes() */


static bool_t recv_one_mesg(rcv_buf, blocking)
msg_buf_header_t **rcv_buf;
bool_t blocking;
{
    fd_set read_fds;
    struct timeval tval, *tvalptr;
    int n, i, n_read;
    bool_t done, avail;
    static int next_proc = 0;
    proc_table_entry *ent;

#ifdef DEBUG
    if ((network_debugging && blocking) || (network_debugging > 1))
	printf("%lu: recv_one_mesg blocking=%lu\n",
	       (unsigned long) ME, (unsigned long) blocking);
#endif

    /* 
     * First, check if there are any messages sitting in the table 
     * already. If there are, pull one of them out and return it. The 
     * stuff waiting on the streams can wait.
     */
    avail = FALSE;
    for (i = 0; i < _p_nodes; i++)
    {
	ent = &(proc_table[i]);
	if (ent->buf != (char *) NULL && ent->len == 0)
	{
	    avail = TRUE;
	    break;
	}
    }

    done = avail;
    while (!done)
    {
	if (!blocking)
	{
	    tval.tv_sec = 0;
	    tval.tv_usec = 0;
	    tvalptr= &tval;
	}
	else
	    tvalptr = NULL;
	
	FD_ZERO(&read_fds);
	set_mask_for_all_workers(&read_fds);
	if (!_p_host && (proc_table[_p_host_id].fd != -1))
	    FD_SET(proc_table[_p_host_id].fd, &read_fds);

#ifdef DEBUG
	if (network_debugging > 1)
	{
	    char buf[1024], buf2[100];
	    buf[0] = 0;
	    for (i = 0; i < howmany(FD_SETSIZE, NFDBITS); i++)
	    {
		sprintf(buf2, "%0lx ", (unsigned long) read_fds.fds_bits[i]);
		strcat(buf, buf2);
	    }
	    printf("%lu: selecting mask=%s\n", ME, buf);
	}
#endif
	n = select(NUM_FDS, &read_fds, (fd_set *) NULL,
		   (fd_set *) NULL, tvalptr);

#ifdef DEBUG
	if (network_debugging > 1)
	    printf("%lu: select returns %d\n", ME, n);
#endif
	
	if (n < 0)
	{
	    char buf[300];

	    if (errno == EINTR)
		continue;
	    
	    sprintf(buf, "recv_one_mesg: select failed: %s",
		    sys_errlist[errno]);
	    _p_fatal_error(buf);
	}

	/* 
	 * Now we need to read something from each fd that has data 
	 * for reading. If something is available on a fd that isn't 
	 * already listed as having a read in progress, then there 
	 * must be a message header there.
	 */

	avail = FALSE;
	for (i = 0; i < _p_nodes; i++)
	{
	    ent = &(proc_table[i]);
	    if (ent->fd < 0)
		continue;
	    if (FD_ISSET(ent->fd, &read_fds))
	    {
#ifdef DEBUG
		if (network_debugging)
		    printf("%lu: fd is set: node=%d fd=%d\n", _p_my_id, i, ent->fd);
#endif
		if (ent->buf == (char *) NULL)
		{
		    msg_buf_header_t hdr;
		    int len;

#ifdef DEBUG
		    if (network_debugging)
			printf("%lu: reading header\n", _p_my_id);
#endif
		    blocking_read_errchk(ent->fd, (char *) &hdr,
					 HEADER_SIZE * CELL_SIZE,
					 "recv_one_mesg");
#ifdef DEBUG
		    if (network_debugging)
		    {
			fprintf(_p_stdout,"%lu: read header\n", ME);
			fprintf(_p_stdout,
				"%lu: Got header: fd=%d size=%lu type=%lu node=%lu\n",
				(unsigned long) _p_my_id,
				ent->fd, (unsigned long) hdr.size,
				(unsigned long) hdr.type,
				(unsigned long) hdr.node);
		    }
#endif
		    
#ifdef TCP_BUMP_HACK
		    if (hdr.alloc_size != -1)
			len = hdr.alloc_size;
		    else
#endif
			len = MsgSize2Bytes(hdr.size);

		    
		    if ((ent->buf = (char *) malloc(len)) == (char *) NULL)
			_p_malloc_error();
		    
		    ent->len = len - HEADER_SIZE * CELL_SIZE;
		    memcpy((char *) ent->buf, (char *) &hdr,
			   HEADER_SIZE * CELL_SIZE);
		    ent->bufend = ent->buf + HEADER_SIZE * CELL_SIZE;
		}

		if (ent->buf == (char *) NULL)
		    _p_fatal_error("recv_one_megs: NULL buffer");
		
		while (ent->len > 0)
		{
#ifdef DEBUG
		    if (network_debugging)
			printf("%lu: reading %d bytes\n", ME, ent->len);
#endif
		    n_read = read(ent->fd, ent->bufend, ent->len);
#ifdef DEBUG
		    if (network_debugging)
			printf("%lu: read returns %d\n", ME, n_read);
#endif
		    if (n_read < 0)
		    {
			if (errno == EINTR)
			    continue;
			else if (errno == EWOULDBLOCK)
			{
#ifdef DEBUG
			    if (network_debugging)
				printf("%lu: read would block\n", ME);
#endif
			    break;
			}
			else
			    FATAL_SYSTEM_ERROR("read failed: %s");
		    }
		    if (n_read == 0)
			break;
		    else
		    {
			ent->len -= n_read;
			ent->bufend += n_read;
		    }
		}
		if (ent->len == 0)
		{
#ifdef DEBUG
		    if (network_debugging)
			printf("%lu: Got complete message id=%d fd=%d\n",
			       ME, i, ent->fd);
#endif
		    avail = TRUE;
		}
	    }
	}
	if (!blocking)
	    done = TRUE;
	else
	    done = avail;
    }

#ifdef DEBUG
    if (network_debugging > 1)
	printf("%lu: end of loop: avail=%lu\n",
	       (unsigned long) ME, (unsigned long) avail);
#endif

    if (!avail && blocking)
	_p_fatal_error("Blocking recv_one_mesg didn't find a message");

    if (!avail)
	return FALSE;

    i = next_proc;
    while (1)
    {
	ent = &(proc_table[next_proc]);
	if (ent->buf != NULL && ent->len == 0)
	    break;
	next_proc = (next_proc + 1) % _p_nodes;
	if (next_proc == i)
	    _p_fatal_error("recv_one_mesg: avail was true, but didn't find a message");
    }

    *rcv_buf = (msg_buf_header_t *) ent->buf;
    
#ifdef DEBUG
    if (network_debugging)
	printf("%lu: Got message avail on entry %ld fd %d len %ld s=%lu t=%lu n=%lu\n",
	       (unsigned long) ME, (long) (ent - proc_table),
	       ent->fd, (long) (ent->bufend - ent->buf),
	       (unsigned long) (**rcv_buf).size,
	       (unsigned long) (**rcv_buf).type,
	       (unsigned long) (**rcv_buf).node);
#endif

    ent->buf = NULL;
    ent->len = 0;

    next_proc = (next_proc + 1) % _p_nodes;

    return TRUE;
}

bool_t _p_msg_receive(node, size, type, rcv_type)
int *node;
int *size;
int *type;
int rcv_type;
{
    bool_t done, done_inner;
    msg_buf_header_t *rcv_buf;
    bool_t gotit;
    bool_t read_from_queue = FALSE;

    for (done = FALSE; !done; )
    {
	if (mq_front != (msg_buf_header_t *) NULL &&
	    (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK))
	{
	    read_from_queue = TRUE;
	    rcv_buf = mq_front;
	    mq_front = mq_front->next;
	}
	else
	{
	    switch (rcv_type)
	    {
	    case RCV_BLOCK:
	    case RCV_NOBLOCK:
		gotit = recv_one_mesg(&rcv_buf, rcv_type == RCV_BLOCK);
		if (!gotit)
		{
		    if (rcv_type == RCV_BLOCK)
			_p_fatal_error("_p_msg_receive: blocking read didn't return a message");
		    else
			return (FALSE);
		}
		break;
		
	    case RCV_PARAMS:
	    case RCV_GAUGE:
		for (done_inner = FALSE; !done_inner; )
		{
		    recv_one_mesg(&rcv_buf, TRUE);

		    if (   (rcv_type==RCV_PARAMS && rcv_buf->type==MSG_PARAMS)
			|| (rcv_type==RCV_GAUGE  && rcv_buf->type==MSG_GAUGE)
			|| rcv_buf->type == MSG_EXIT 
			|| rcv_buf->type == MSG_INITIATE_EXIT )
		    {
			done_inner = TRUE;
		    }
		    else
		    {
			if (mq_front == (msg_buf_header_t *) NULL)
			{
			    mq_front = mq_back = rcv_buf;
			    rcv_buf->next = (msg_buf_header_t *)  NULL;
			}
			else
			{
			    mq_back->next = rcv_buf;
			    mq_back = rcv_buf;
			    rcv_buf->next = (msg_buf_header_t *)  NULL;
			}
		    }
		}
		break;

	    case RCV_COLLECT:
		for (done_inner = FALSE; !done_inner; )
		{
		    recv_one_mesg(&rcv_buf, TRUE);
		    
		    switch (rcv_buf->type)
		    {
		    case MSG_READ:
		    case MSG_CANCEL:
		    case MSG_EXIT:
		    case MSG_INITIATE_EXIT:
			done_inner = TRUE;
			break;
		    case MSG_COLLECT:
			FreeMsgBuf(rcv_buf);
			break;
		    default:
			/*
			 * MSG_DEFINE, MSG_VALUE, MSG_CLOSE_STREAM,
			 * MSG_GAUGE, or a stream message
			 */
			if (mq_front == (msg_buf_header_t *) NULL)
			{
			    mq_front = mq_back = rcv_buf;
			    rcv_buf->next = (msg_buf_header_t *)  NULL;
			}
			else
			{
			    mq_back->next = rcv_buf;
			    mq_back = rcv_buf;
			    rcv_buf->next = (msg_buf_header_t *)  NULL;
			}
			break;
		    }
		}
		break;
	    }
	}

#ifdef STREAMS	
	if (rcv_buf->type == MSG_CLOSE_STREAM)
	{
	    handle_close_stream(rcv_buf);
	}
	else if (rcv_buf->type < LAST_STREAM_MSG_TYPE)
	{
	    handle_stream_receive(rcv_buf);
	}
	else	/* Normal non-stream related message */
	{
#endif /* STREAMS */	    
	    *size = rcv_buf->size;
	    *node = rcv_buf->node;
	    *type = rcv_buf->type;
	    
	    /*
	     * Now copy the message onto the heap.  If there is
	     * not enough room for it, then garbage collect.
	     * Assume that there will always be enough room
	     * for MSG_CANCEL message.
	     */
	    if (*type != MSG_CANCEL)
	    {
		TryGCWithSize(*size);
	    }
	    memcpy(_p_heap_ptr, rcv_buf->user_buf, *size * CELL_SIZE);
	    
	    FreeMsgBuf(rcv_buf);
	    
#ifdef DEBUG
	    if (ParDebug(8))
	    {
		fprintf(_p_stdout,
			"(%lu,%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n",
			(unsigned long) _p_my_id, (unsigned long) _p_reduction,
			(unsigned long) *node, (unsigned long) *size,
			(unsigned long) *type);
		fflush(_p_stdout);
	    }
#endif /* DEBUG */
	    done = TRUE;
#ifdef STREAMS	    
	}
#endif /* STREAMS */
    }
    
    return (TRUE);
}

void _p_msg_send(buf, node, size, type)
cell_t *buf;
int node;
int size;
int type;
{
    bool_t gotit;
    msg_buf_header_t *send_buf, *rcv_buf;
    int_t allocated_size;
    int len;
    int n_left, n_sent;
    char *send_ptr;

#ifdef DEBUG    
    if (node == _p_my_id)
	_p_fatal_error("_p_msg_send(): Help! I can't send to myself!");
#endif /* DEBUG */
    

    if (buf == (cell_t *) NULL)
    {
	if ((send_buf = (msg_buf_header_t *) malloc(MsgSize2Bytes(0))) == NULL)
	    _p_malloc_error();
	allocated_size = 0;
	size = 0;
    }
    else
    {
	send_buf = User2MsgBuf(buf);
	allocated_size = send_buf->size;
    }

#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout,
		"(%lu,%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(unsigned long) node, (unsigned long) size,
		(unsigned long) type);
	fflush(_p_stdout);
    }
#endif /* DEBUG */

    send_buf->size = size;
    send_buf->type = type;
    send_buf->node = _p_my_id;

#ifdef TCP_BUMP_HACK
    len = send_buf->alloc_size == -1 ? MsgSize2Bytes(size) :
	send_buf->alloc_size;
#else
    len = MsgSize2Bytes(size);
#endif

    n_left = len;
    send_ptr = (char *) send_buf;

    while (n_left > 0)
    {
	while (1)
	{
	    gotit = recv_one_mesg(&rcv_buf, FALSE); /* Nonblocking read */
	    if (!gotit)
		break;
	    
	    if (mq_front == (msg_buf_header_t *) NULL)
	    {
		mq_front = mq_back = rcv_buf;
		rcv_buf->next = (msg_buf_header_t *) NULL;
	    }
	    else
	    {
		mq_back->next = rcv_buf;
		mq_back = rcv_buf;
		rcv_buf->next = (msg_buf_header_t *) NULL;
	    }
	}

	/*
	 * The HP machine has a problem with writing large blocks of data.
	 * According to the write(2) manual page, there should be
	 * no problem with this. But I was getting "Message too long"
	 * errors.
	 *
	 * Supposedly, one can write PIPSIZ (=8192) bytes to a pipe.
	 * Even that got me the Message too long errors. Hence we
	 * write in 4K packets.
	 */
#ifdef hpux
	len = n_left > 4096 ? 4096 : n_left;
#else
	len = n_left;
#endif
	    
	n_sent = write(proc_table[node].fd, send_ptr, len);
	if (n_sent < 0)
	{
	    if (errno == EWOULDBLOCK)
	    {
#ifdef DEBUG
		if (network_debugging)
		    fprintf(_p_stdout, "(%s) Send would block\n",
			    ident());
#endif		
		continue;
	    }
	    else if (errno == EPIPE)
	    {
		_p_fatal_error("_p_msg_send: Got eof on send");
	    }
	    else
	    {
		static char buf[100];
		sprintf(buf, "_p_send_msg: Write failed: %s",
			SYSTEM_ERROR);
		_p_fatal_error(buf);
	    }
	}
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) Sent %d/%d bytes to %d\n",
		    ident(), n_sent, n_left, node);
#endif	
	n_left -= n_sent;
	send_ptr += n_sent;
    }

    FreeMsgBuf(send_buf);
    
#ifdef DEBUG
    if (ParDebug(9))
    {
	fprintf(_p_stdout, "(%lu,%lu) _p_msg_send: Sent to %lu\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(unsigned long) node);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
}

void _p_destroy_nodes()
{
    destroying_nodes = TRUE;
}

void _p_sr_node_initialized()
{
    worker_ctl_msg_t msg;
    int i;

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout,
		"(%s) Got sr_node_initialized, _p_multiprocessing=%lu\n",
		ident(), (unsigned long) _p_multiprocessing);
#endif

#if 1
    
    if (!_p_multiprocessing)
	return;

    if (_p_host)
    {
	for (i = 1; i < _p_nodes; i++)
	{
	    msg.type = WORKER_CTL_CHECKIN;
	    msg.i1 = i;

	    blocking_write_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
				  "host checkin send");

	    blocking_read_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
				 "_p_sr_node_initialized");
	    if (msg.type != WORKER_CTL_CHECKIN_ACK)
	    {
		_p_fatal_error("Wanted WORKER_CTL_CHECKIN_ACK in _p_sr_node_initialized");
	    }
	    if (msg.i1 != i)
	    {
		_p_fatal_error("_p_sr_node_initialized: msg.i1 != i");
	    }
#ifdef DEBUG
	    if (network_debugging)
		fprintf(_p_stdout, "(%s) Node %d checked in\n", ident(), i);
#endif	    
	}
    }
    else
    {
	blocking_read_errchk(proc_table[_p_host_id].fd, (char *) &msg,sizeof(msg),
			     "_p_sr_node_initialized");

	if (msg.type != WORKER_CTL_CHECKIN)
	{
	    _p_fatal_error("host sent wrong message type on checkin");
	}

	if (msg.i1 != _p_my_id)
	{
	    _p_fatal_error("_p_sr_node_initialized: msg.i1 != my_id");
	}

	msg.type = WORKER_CTL_CHECKIN_ACK;
	msg.i1 = _p_my_id;
	blocking_write_errchk(proc_table[_p_host_id].fd, (char *) &msg,
			      sizeof(msg),
			      "_p_sr_node_initialized");
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) Worker %lu done checking in\n",
		    ident(), (unsigned long) _p_my_id);
#endif
    }
#endif    
}

/*
 * int fork_nodes(int n)
 *
 * Fork off n worker nodes.
 */
static int fork_nodes(n)
int n;
{
    char host_name[128];
    int pid = 1;

    if (gethostname(host_name, sizeof(host_name)) < 0)
    {
	sr_fatal_error("gethostname failed");
    }
    
    for (; n > 0; n--)
    {
#ifdef DEBUG
	if (ParDebug(9))
	    printf("Forking off a worker node\n");
#endif /* DEBUG */
	if ((pid = fork()) < 0)			/* error */
	{
	    sr_fatal_error("Failed fork()");
	}
	else if (pid  == 0)			/* child */
	{ 
	    _p_host = FALSE;
	    if (parallel_info[0] == '\0')
		sprintf(parallel_info, "%s,%d", host_name, my_port);
	    close(listen_fd);
	    break;
	}
    }
    return (pid);
} /* fork_nodes() */

static void setup_host_port()
{
    net_setup_anon_listener(50, &my_port, &listen_fd);
}

void _p_sr_init_node(argc, argv)
int argc;
char *argv[];
{
    int i;
    
    /* Initialize the proc table with all fd=-1 */

    _p_stdout = stdout;

    for (i = 0; i < MAX_NODES; i++)
    {
	proc_table[i].buf = NULL;
	proc_table[i].len = 0;
	proc_table[i].fd = -1;
    }

    /*
     * _p_nodes will have a default value of -1.  Therefore, all of the
     * tests below should allow a negative value for _p_nodes, which
     * indicates that _p_nodes was not set on the command line with -n.
     */
    if(_p_nodes > MAX_NODES)
    {
	char buf[256];
	sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
	sr_fatal_error(buf);
    }

    if (_p_nodes <= 1 && !use_start_nodes
	&& host_startup_list[0] == '\0' 
	&& parallel_startup[0] == '\0' && parallel_info[0] == '\0')
    {
	/*
	 * Do a uniprocessing run and return
	 */
	_p_my_id = _p_host_id = 0;
	_p_nodes = 1;
	_p_host = TRUE;
	/* Finish node initialization and run emulator */
	_p_init_node(argc, argv);
	return;
    }

    signal(SIGPIPE, SIG_IGN);

    if (parallel_info[0] != '\0')
    {
	/*
	 * This is a node.  The -pi flag was used to tell this node
	 * to start up _p_nodes nodes on this machine.
	 */
	_p_host = FALSE;
	fork_nodes(_p_nodes - 1);
    }
    else if (host_startup_list[0] != '\0')
    {
	/*
	 * This is the host.  The -nodes flag was used to specify
	 * a list of the machines on which to start nodes.
	 */
	_p_host = TRUE;
	setup_host_port();
	_p_nodes = 1;
	start_nodes_from_list(host_startup_list);
    }
    else if (parallel_startup[0] != '\0')
    {
	/*
	 * This is the host.  The -s flag was used to specify
	 * a startup file.
	 */
	_p_host = TRUE;

	setup_host_port();

	_p_nodes = 1;
	read_startup_file(parallel_startup);
    }
    else if (use_start_nodes)
    {
	/*
	 * This is the host.  The -start-nodes flag was used to
	 * specify that it should start up the nodes
	 * with the host-control stuff.  Use the value of _p_nodes (which
	 * is set with the -n flag) to control how many nodes will
	 * be created.
	 */
	char host_name[256];
	int num;
    
	if (gethostname(host_name, sizeof(host_name)) < 0)
	{
	    sr_fatal_error("gethostname failed");
	}

	_p_host = TRUE;

	setup_host_port();

	num = _p_nodes - 1;
	_p_nodes = 1;		/* Since start_nodes increments _p_nodes */
	start_nodes(num, host_name);
    }
    else if (_p_nodes > 1)
    {
	/*
	 * This is the host.  Only the -n flag was used, so it
	 * should fork off _p_nodes-1 workers.
	 */
	_p_host = TRUE;
	setup_host_port();

	fork_nodes(_p_nodes - 1);
    }
    else
    {
	sr_fatal_error("Illegal argument combination");
    }

    
    if (_p_host)
    {
	start_host();
    }
    else
    {
	start_worker();
    }

#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout, "All right! just before init, proc table is...\n");
	dump_proc_table();
    }
#endif

    /* OPTIMAL_TCP_BLOCKSIZE is the magic packet size for TCP */
    _p_default_msg_buffer_size = OPTIMAL_TCP_BLOCKSIZE / CELL_SIZE - HEADER_SIZE;
    mq_front = mq_back = (msg_buf_header_t *) NULL;

    _p_init_node(argc, argv);
}

static void read_startup_file(file)
char *file;
{
    FILE *fp;
    char buf1[MAX_PATH_LENGTH], *b1;
    char host_name[256];
    int i, j;

    /* Get the host's host name */
    if (gethostname(host_name, sizeof(host_name)) == -1)
	sr_fatal_error("Failed to get host's machine name");


    if ((fp = fopen(file, "r")) == (FILE *) NULL)
	sr_fatal_error("Unable to open parallel startup file");
    
    /* Read in parallel startup file, one line at a time */
    for (i = 1; fgets(buf1, MAX_PATH_LENGTH - 1, fp) != (char *) NULL; i++)
    {
	
	if (buf1[0] == '%' || buf1[0] == '\n' || buf1[0] == ' '
	    || buf1[0] == '\t' || buf1[0] == '#')
	{
	    continue;
	}
	else if (strncmp(buf1, "fork", 4) == 0)
	{
	    /* Fork off the specified number of nodes */
	    if (sscanf(buf1+4, "%d", &j) != 1 || j < 1)
	    {
		sprintf(buf1, "Bad startup command, line %d", i);
		sr_fatal_error(buf1);
	    }
	    _p_nodes += j;
	    if (fork_nodes(j) == 0)	/* child */
		break;
	}
	else if (strncmp(buf1, "start-nodes", 11) == 0)
	    /* Use the start-nodes script to connect to the
	     * node-control daemons */
	{
	    int to_start;

	    b1 = buf1 + 11;

	    to_start = atoi(b1);

	    if (to_start == 0)
	    {
		sr_fatal_error("Invalid start-nodes");
	    }
	    start_nodes(to_start, host_name);
	}
	else if (strncmp(buf1, "exec", 4) == 0) /* use system() */
	{
	    j = strlen(buf1) - 1;
	    if (buf1[j] == '\n')
		buf1[j] = '\0';
	    /* Get the number of nodes to fork */
	    if (sscanf(buf1+4, "%d", &j) != 1 || j < 1)
	    {
		sprintf(buf1, "Bad startup command, line %d", i);
		sr_fatal_error(buf1);
	    }

	    /* Find the start of the command */
	    if ((b1 = strchr(buf1, ':')) == (char *) NULL)
	    {
		sprintf(buf1, "Bad startup command, line %d", i);
		sr_fatal_error(buf1);
	    }

	    b1++;
	    start_child(j, b1,host_name);
	}
	else
	{
	    sprintf(buf1, "Bad startup command, line %d", i);
	    sr_fatal_error(buf1);
	}
    }
    
    fclose(fp);
}

static void start_nodes_from_list(hosts)
char *hosts;
{
    char *hptr, *s, *host;
    int done;
    char pwd[MAX_PATH_LENGTH];
    char host_name[256];

    /* Get the host's host name */
    if (gethostname(host_name, sizeof(host_name)) == -1)
	sr_fatal_error("Failed to get host's machine name");

    if (getwd(pwd) == 0)
	sr_fatal_error_1("getwd failed: %s\n", pwd);

    for (hptr = hosts, done = 0; !done; )
    {
	host = hptr;
	s = strchr(hptr, ':');
	if (s)
	{
	    hptr = s + 1;
	    *s = 0;
	}
	else
	    done = 1;
	
	start_node_on_host(host, exe_from_argv, pwd, host_name);
    }
}

static void start_node_on_host(host, prog, pwd, myhost)
char *host, *prog, *pwd, *myhost;
{
    char cmd[1024];
#ifdef DEBUG
    if (network_debugging)
    {
	printf("Starting node on \"%s\" prog=\"%s\" pwd=\"%s\"\n",
	       host, prog, pwd);
    }
#endif

    sprintf(cmd, "rsh %s -n 'cd %s; %s -pcn -n 1 -pi %s,%d' &",
	    host, pwd, prog, myhost, my_port);
    
#ifdef DEBUG
    if (network_debugging)
    {
	printf("Command is <%s>\n", cmd);
    }
#endif

    if (dont_start_remotes)
    {
	printf("Would have run command \"%s\"\n", cmd);
    }
    else if (system(cmd) != 0)
    {
	sr_fatal_error("Failed system()");
    }
    _p_nodes++;
}

static void start_nodes(num, myhost)
int num;
char *myhost;
{
    char argbuf[512];
    char host_daemon_host[512];
    int host_daemon_port;
    int nstarted;

    /* Construct the $ARGS$ argument */
    sprintf(argbuf, " -pcn -n 1 -pi %s,%d",
	    myhost, my_port);

    find_host_daemon(host_daemon_host, &host_daemon_port);

    if (host_daemon_host[0] == '\0')
    {
	nstarted = start_nodes_with_script(num, argbuf);
    }
    else
    {
	/* Ok. A host daemon is running. Use it to start the nodes */
	nstarted = start_nodes_with_daemon(host_daemon_host, host_daemon_port,
					   num, argbuf);

#ifdef DEBUG
	if (network_debugging)
	    printf("started %d nodes with daemon\n", nstarted);
#endif
    }
    _p_nodes += nstarted;
}

static int start_nodes_with_script(num, args)
int num;
char *args;
{
    char cmdbuf[1024];

    sprintf(cmdbuf, "start-nodes %d %s", num, args);

#ifdef DEBUG
    if (network_debugging)
	printf("cmdbuf: '%s'\n", cmdbuf);
#endif

    if (dont_start_remotes)
    {
	printf("Would have run <%s>\n", cmdbuf);
    }
    else if (system(cmdbuf) != 0)
    {
	sr_fatal_error("failed system()");
    }
    /* Here we don't know how many were started, so just assume it worked */
    return num;
}

static int start_nodes_with_daemon(host, port, nnodes, args)
char *host, *args;
int port, nnodes;
{
    int sock, n;
    char buf[1000];
    char *bptr;
    char pwd[MAX_PATH_LENGTH+1];
    char hcfile[MAX_PATH_LENGTH + 1];

    if (getwd(pwd) == 0)
	sr_fatal_error_1("getwd failed: %s\n", pwd);

#ifdef DEBUG
    if (network_debugging)
	printf("connecting to %s/%d\n", host, port);
#endif
    sock = net_conn_to_listener(host, port);

    if (sock < 0)
    {
	sr_fatal_error_1("connect to host daemon failed: %s", SYSTEM_ERROR);
	return 0;
    }

    read_line_from_fd(sock, buf, sizeof(buf));

#ifdef DEBUG
    if (network_debugging)
	printf("got buf '%s'\n", buf);
#endif

    sprintf(buf, "PCN startup arch=%s het=%lu listen_port=%d\n",
	    PCN_ARCH, (unsigned long) start_heterogeneous, my_port);
#ifdef DEBUG
    if (network_debugging)
	printf("sending buf '%s'\n", buf);
#endif
    write_line_to_fd(sock, buf);

    /*
     * Read the ack
     */
    read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
		if (network_debugging)
		    printf("Got ack '%s'\n", buf);
#endif

    if (hostconfig_file[0] != '\0')
    {
	if (hostconfig_file[0] != '/')
	    sprintf(hcfile, "%s/%s", pwd, hostconfig_file);
	else
	    strcpy(hcfile, hostconfig_file);

	if (access(hcfile, R_OK) != 0)
	{
	    printf("hostconfig file %s not accessible\n", hcfile);
	    hcfile[0] = '\0';
	}
	else
	{
	    sprintf(buf, "cmd: load-config %s\n", hcfile);
	    write(sock, buf, strlen(buf));

	    while (1)
	    {
		read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
		if (network_debugging)
		    printf("Got line '%s'\n", buf);
#endif
		if (buf[0] == '.' && buf[1] == '\0')
		    break;
	    }
	}
    }

    strcpy(buf, "cmd: enabled-hosts\n");
    if (write(sock, buf, strlen(buf)) < 0)
    {
	perror("write");
	exit(1);
    }

    read_line_from_fd(sock, buf, sizeof(buf));

#ifdef DEBUG
    if (network_debugging)
	printf("got buf '%s'\n", buf);
#endif

    bptr = strchr(buf, ':');
    bptr++;
    n = atoi(bptr);
#ifdef DEBUG
    if (network_debugging)
	printf("got %d nodes enabled\n", n);
#endif

    if (n == 0)
    {
	printf("No nodes enabled in host-control\n");
	close(sock);
	return n;
    }

    /* Eat "." line */
    read_line_from_fd(sock, buf, sizeof(buf));

    /* If we've got a hostconfig file, then just execute a 
     * startnodes command on the host.
     * Otherwise, do a startnodes_withdir using our current directory 
     * as the directory and argv[0] as the pcn pathname
     */

    if (hcfile[0] == '\0')
    {
	sprintf(buf, "cmd: startnodes_withdir %s %s %d %s\n",
		pwd, exe_from_argv, nnodes, args);
    }
    else
    {
	sprintf(buf, "cmd: startnodes %d %s\n", nnodes, args);
    }
    
#ifdef DEBUG
    if (network_debugging)
	printf("starting nodes with '%s'\n", buf);
#endif
    write(sock, buf, strlen(buf));

    read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
    if (network_debugging)
	printf("got line '%s'\n", buf);
#endif    

    bptr = strchr(buf, ':');
    if (bptr == NULL)
    {
	close(sock);
	sr_fatal_error("Startup with host-control failed\n");
    }
    bptr++;
    while (isspace(*bptr))
	bptr++;
    n = atoi(bptr);
#ifdef DEBUG
    if (network_debugging)
	printf("Started %d nodes\n", n);
#endif

    close(sock);
    return n;
}

static void write_line_to_fd(sock, str)
int sock;
char *str;
{
    int len = strlen(str);

    if (write(sock, str, len) < 0)
    {
	perror("write");
    }
}

static void read_line_from_fd(sock, buf, len)
int sock, len;
char *buf;
{
    char *bptr;
    int n;
    
    bptr = buf;
    while (1)
    {
	n = read(sock, bptr, 1);
	if (n < 0)
	{
	    perror("read");
	    exit(1);
	}
	if (*bptr == '\n')
	    break;
	bptr++;
	if (bptr - buf >= len)
	    break;
    }
    *bptr = 0;
}

static void find_host_daemon(host, port)
char *host;
int *port;
{
    FILE *fp;
    char *home, *dir;
    char file[MAX_PATH_LENGTH+1];
    char hostfile[MAX_PATH_LENGTH+1];
    char buf[100];
    DIR *dirp;

#ifdef USE_DIRENT    
    struct dirent *ent;
#else
#if defined(USE_DIRECT)
    struct direct *ent;
#endif
#endif

    hostfile[0] = '\0';
    
    dir = getenv("PCN_CONTROL_DIR");

    if (dir == NULL)
    {
	home = getenv("HOME");

	if ((dir = (char *) malloc(MAX_PATH_LENGTH+1)) == (char *) NULL)
	    _p_malloc_error();
	sprintf(dir, "%s/.pcn_control/hosts", home);
#ifdef DEBUG
	if (network_debugging)
	    printf("got dir '%s'\n", dir);
#endif
    }

#if defined(USE_DIRECT)

    dirp = opendir(dir);
    if (dirp == NULL)
    {
#ifdef DEBUG
	if (network_debugging)
	    printf("couldn't open dir %s\n", dir);
#endif
	*host = 0;
	return;
    }

    while ((ent = readdir(dirp)) != NULL)
    {
	if (ent->d_name[0] != '.')
	    break;
    }

    closedir(dirp);

    if (ent)
    {
	strncpy(hostfile, ent->d_name, ent->d_namlen);
	hostfile[ent->d_namlen] = '\0';
    }
    
#else
#if defined(USE_DIRENT)

    dirp = opendir(dir);
    if (dirp == NULL)
    {
#ifdef DEBUG
	if (network_debugging)
	    printf("couldn't open dir %s\n", dir);
#endif
	*host = 0;
	return;
    }

    while ((ent = readdir(dirp)) != NULL)
    {
	if (ent->d_name[0] != '.')
	    break;
    }

    closedir(dirp);

    if (ent)
	strcpy(hostfile, ent->d_name);
	
#endif
#endif    

    if (hostfile[0] == '\0')
    {
#ifdef DEBUG
	printf("didn't find a host file\n");
#endif
	*host = '\0';
	return;
    }

    sprintf(file, "%s/%s", dir, hostfile);
    if ((fp = fopen(file, "r")) == NULL)
    {
#ifdef DEBUG
	if (network_debugging)
	    printf("couldn't open file '%s': %s\n", file, sys_errlist[errno]);
#endif
	*host = 0;
	return;
    }

    if (fgets(buf, sizeof(buf), fp) == NULL)
    {
#ifdef DEBUG
	if (network_debugging)
	    printf("couldn't get a line from file\n");
#endif
	*host = 0;
	return;
    }

    fclose(fp);

    *port = atoi(buf);
    if (*port == 0)
    {
#ifdef DEBUG
	if (network_debugging)
	    printf("invalid line '%s'\n", buf);
#endif
	*host = 0;
	return;
    }

    strcpy(host, ent->d_name);
}

static void start_child(num, rawcmd, host_name)
int num;
char *rawcmd;
char *host_name;
{
    char argbuf[512], cmdbuf[1024], buf1[512];
    char *rawcmdptr, *cmdptr;
    bool_t found;

    /* Construct the $ARGS$ argument */
    sprintf(argbuf, " -pcn -n %d -pi %s,%d",
	    num, host_name, my_port);

    cmdptr = cmdbuf;
    /* Substitute $ARGS$ in the command with appropriate stuff */
    for (rawcmdptr = rawcmd, cmdptr = cmdbuf, found = FALSE;
	 *rawcmdptr != '\0'; )
    {
	if (strncmp(rawcmdptr, "$ARGS$", 6) == 0)
	{
	    *cmdptr = '\0';
	    strcat(cmdbuf, argbuf);
	    strcat(cmdbuf, rawcmdptr + 6);
	    found = TRUE;
	    break;
	}
	else
	    *cmdptr++ = *rawcmdptr++;
    }
    if (!found)
    {
	sprintf(buf1, "Bad startup command -- no $ARGS$");
	sr_fatal_error(buf1);
    }
    strcat(cmdbuf, " &");

    _p_nodes += num;
    
    /* Now system() it off */
#ifdef DEBUG
    if (ParDebug(9))
	fprintf(_p_stdout, "Execing worker with: %s\n", cmdbuf);
#endif


    if (dont_start_remotes)
    {
	printf("Would have run command \"%s\"\n", cmdbuf);
    }
    else if (system(cmdbuf) != 0)
    {
	sr_fatal_error("Failed system()");
    }
}
    
/*
 * void sr_fatal_error(char *msg)
 *
 * Used by _p_sr_init_node() to deal with fatal errors during the
 * worker creation process.
 */
static void sr_fatal_error(msg)
char *msg;
{
    char *syserr = errno == 0 ? "" : sys_errlist[errno];
    if (_p_my_id == -1)		/* Node doesn't have an id yet */
    {
	fprintf(stderr,
		"Fatal error: Node ???: Failed to create workers: %s (%s)\n",
		msg, syserr);
    }
    else
    {
	fprintf(stderr,
		"Fatal error: Node %lu: Failed to create workers: %s (%s)\n",
		(unsigned long) _p_my_id, msg, syserr);
    }
    exit(1);
} /* sr_fatal_error() */

static void sr_fatal_error_1(msg, arg)
char *msg;
char *arg;
{
    char buf[1024];

    sprintf(buf, msg, arg);
    sr_fatal_error(buf);
}

/**************************************************************/

/* 
 * Start up the host node.  
 * 
 *    Create a port (my_port) and socket (listen_fd) to listen for 
 *    children's connections. 
 *    
 *    Select on listen_fd and on any child connection.
 *    
 *    When each children connects, send message to each telling it
 *    its node number and the ids of each of the other children.
 *   
 */

static void start_host()
{
    int i, rc;
    worker_ctl_msg_t msg;
    char buf[300];

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) host accepting connections\n", ident());
#endif

    accept_connections();

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) host accepted connections\n", ident());
#endif
    
#ifdef DEBUG
    if (network_debugging)
	dump_proc_table();
#endif

    _p_host_id =_p_my_id = 0;

    for (i = 1; i < _p_nodes; i++)
    {
#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) host sending proc table to %d\n", ident(),
		i);
#endif
	
	send_proc_table(i);
#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) host sent proc table to %d\n", ident(),
		i);
#endif
    }

    /* Tell everyone to connect.
     *
     * We even tell node _p_nodes-1 to connect even though he doesn't have
     * anyone to connect to since it makes the code simpler. It provides
     * another place to check with the node to ensure it is still there,
     * though.
     */

    for (i = 1; i < _p_nodes; i++)
    {
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) host telling %d to start connecting\n",
		    ident(), i);
#endif
	msg.type = WORKER_CTL_START_CONNECTING;
	blocking_write_sr_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
				 "start_host");

	if ((rc = blocking_read(proc_table[i].fd, (char *) &msg, sizeof(msg))) < 0)
	{
	    sprintf(buf, "start_host: read failed from node %d: %s", i,
		    SYSTEM_ERROR);
	    sr_fatal_error(buf);
	}
	else if (rc == 0)
	{
	    sprintf(buf, "start_host: read failed from node %d: got eof", i);
	    sr_fatal_error(buf);
	}
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) host found connections from %d done\n",
		    ident(), i);
#endif

	if (msg.type != WORKER_CTL_CONNECTIONS_MADE)
	{
	    sprintf(buf, "start_host: wanted type %d, got %d\n",
		    WORKER_CTL_CONNECTIONS_MADE, msg.type);
	    sr_fatal_error(buf);
	}
    }

    /* Let's go! */

    for (i = 1; i < _p_nodes; i++)
    {
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) host tells worker %d go\n", ident(), i);
#endif
	msg.type = WORKER_CTL_GO;
	blocking_write_sr_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
				 "start_host");

	if ((rc = blocking_read(proc_table[i].fd, (char *) &msg, sizeof(msg))) < 0)
	{
	    sprintf(buf, "start_host: read failed for go ack on node %d: %s",
		    i, SYSTEM_ERROR);
	    sr_fatal_error(buf);
	}
	else if (rc == 0)
	{
	    sprintf(buf, "start_host: read failed for go ack on node %d: eof",
		    i);
	    sr_fatal_error(buf);
	}

	if (msg.type != WORKER_CTL_GO_ACK)
	{
	    sprintf(buf, "start_host: didn't get ack, got %ld instead",
		    (long) msg.type);
	    sr_fatal_error(buf);
	}
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) host got GO_ACK from %d\n", ident(), i);
#endif
    }
}

static void accept_connections()
{
    fd_set read_fds;
    int n, fd, n_connected;
    worker_ctl_msg_t msg;

    /* Loop until all of the workers have connected back with the host */

    for (n_connected = 1;
	 n_connected < _p_nodes;
	 /* n_connected is incremented in process_connection() */  )
    {
	FD_ZERO(&read_fds);
	set_mask_for_all_workers(&read_fds);
	FD_SET(listen_fd, &read_fds);

	n = select(NUM_FDS, &read_fds, (fd_set *) NULL,
		   (fd_set *) NULL, (struct timeval *) NULL);

	if (n == 0)
	    continue;

	if (n < 0)
	{
	    char buf[300];
	    if (errno == EINTR)
		continue;
	    
	    sprintf(buf, "Fatal error in select: %s\n", sys_errlist[errno]);
	    sr_fatal_error(buf);
	}
	
	for (fd = 0; fd < NUM_FDS && n > 0; fd++)
	{
	    if (FD_ISSET(fd, &read_fds))
	    {
		n--;
		
		if (fd == listen_fd)
		    process_connection(&n_connected);
		else
		{
		    blocking_read_sr_errchk(fd, (char *) &msg, sizeof(msg),
					    "accept_connections");
		    sr_fatal_error("accept_connections: Didn't expect message from other worker");
		}
	    }
	}
    }
}

static void send_proc_table(node)
int node;
{
    int i, fd;
    worker_ctl_msg_t msg;

    fd = proc_table[node].fd;


    /* Don't need to send entries for node or host */
    for (i = 1; i < _p_nodes; i++)
    {
	if (i == node)
	    continue;

	msg.type = WORKER_CTL_PROCTBL_ENTRY;
	msg.i1 = i;
	msg.i2 = proc_table[i].port;
	strcpy(msg.buf, proc_table[i].host);
	blocking_write_sr_errchk(fd, (char *) &msg, sizeof(msg), "send_proc_table");
    }
    msg.type = WORKER_CTL_PROCTBL_END;
    blocking_write_sr_errchk(fd, (char *) &msg, sizeof(msg), "send_proc_table");
}	
    

/* 
 * Process a new connection on the host's port,
 * incrementing *n_conn with success
 */
static void process_connection(n_conn)
int *n_conn;
{
    int worker_fd;
    
    worker_ctl_msg_t msg;

    worker_fd = net_accept(listen_fd);

    if (worker_fd < 0)
    {
	sr_fatal_error("process_connection: accept failed");
    }

    set_nonblocking(worker_fd);

    proc_table[*n_conn].fd = worker_fd;

    msg.type = WORKER_CTL_INIT;
    msg.i1 = *n_conn;
    msg.i2 = _p_nodes;

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) host sends CTL_INIT to %d\n", ident(),
		*n_conn);
#endif

    blocking_write_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
			     "process_connection");

    blocking_read_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
			    "process_connection");

    if (msg.type == WORKER_CTL_EXEC_FAILED)
    {
	char buf[300];
	errno = 0;
	sprintf(buf,
		"host-control says worker exec failed: %s",
		msg.buf);
	sr_fatal_error(buf);
    }
    else if (msg.type != WORKER_CTL_PORT)
    {
	char buf[300];
	sprintf(buf, "incorrect type in receive: %ld, wanted %d",
		(long) msg.type, WORKER_CTL_PORT);
	sr_fatal_error(buf);
    }

#ifdef DEBUG
    if (network_debugging)
	printf("Host got connection for node %d host %s port %d\n",
	       *n_conn, msg.buf, msg.i1);
#endif    

    proc_table[*n_conn].port = msg.i1;
    strcpy(proc_table[*n_conn].host, msg.buf);
    (*n_conn)++;
}

/* 
 * The worker connects back to the host's port, determined by parsing 
 * the argument passed in parallel_info
 * 
 */
static void start_worker()
{
    char *s, host[256];
    int port;
    int host_fd;
    worker_ctl_msg_t msg;
    
    if (parallel_info[0] == '\0')
	sr_fatal_error("Worker doesn't know his host");

    if ((s = strchr(parallel_info, ',')) == (char *) NULL)
    {
	sr_fatal_error("Failed to parse -pi argument");
    }

    *s = 0;

    strcpy(host, parallel_info);

    port = atoi(s + 1);

    host_fd = net_conn_to_listener(host, port);

    if (host_fd < 0)
    {
	sr_fatal_error("Error connecting to host");
    }

    set_nonblocking(host_fd);

    blocking_read_sr_errchk(host_fd, (char *) &msg, sizeof(msg), "start_worker");

    if (msg.type != WORKER_CTL_INIT)
    {
	sr_fatal_error("Incorrect message type from host");
    }
    
    _p_my_id = msg.i1;

    _p_nodes = msg.i2;

    _p_host_id = 0;

    proc_table[_p_host_id].fd = host_fd;
    proc_table[_p_host_id].port = port;
    strcpy(proc_table[_p_host_id].host, host);

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) worker got CTL_INIT _p_nodes=%lu \n", ident(),
		(unsigned long) _p_nodes);
#endif

    net_setup_anon_listener(3, &my_port, &listen_fd);

    if (gethostname(msg.buf, sizeof(msg.buf)) == -1)
    {
	sr_fatal_error("gethostname failed");
    }

    msg.type = WORKER_CTL_PORT;
    msg.i1 = my_port;

    blocking_write_sr_errchk(host_fd, (char *) &msg, sizeof(msg), "start_worker");

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) worker send port %d\n", ident(), my_port);
#endif
    
    proc_table[_p_my_id].fd = listen_fd;
    proc_table[_p_my_id].port = my_port;
    strcpy(proc_table[_p_my_id].host, msg.buf);

    receive_proc_table();
#ifdef DEBUG
    if (ParDebug(8))
    {
	fprintf(_p_stdout, "Worker %lu proc table:\n",
		(unsigned long) _p_my_id);
	dump_proc_table();
    }
#endif

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) worker awaiting connections \n", ident());
#endif
    
    await_connections();
#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) worker making connections \n", ident());
#endif
    
    make_connections();
#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) worker awaiting go \n", ident());
#endif
    await_go();
#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) worker got go \n", ident());
#endif
}

/* Wait for a go message from the host */
static void await_go()
{
    worker_ctl_msg_t msg;

    blocking_read_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
			    "await_go");

    if (msg.type != WORKER_CTL_GO)
    {
	sr_fatal_error("await_go: didn't get go");
    }

    msg.type = WORKER_CTL_GO_ACK;
    blocking_write_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
			     "await_go");
}    

static void receive_proc_table()
{
    bool_t done;
    worker_ctl_msg_t msg;
    int host_fd;
    
    /* Just hang in a receive from the host. No one else should be
       sending to us yet, so they can just wait. */

    host_fd = proc_table[_p_host_id].fd;

    for (done = FALSE; !done; )
    {
	blocking_read_sr_errchk(host_fd, (char *) &msg, sizeof(msg),
				"receive_proc_table");

	switch (msg.type)
	{
	case WORKER_CTL_PROCTBL_ENTRY:
	    proc_table[msg.i1].port = msg.i2;
	    strcpy(proc_table[msg.i1].host, msg.buf);
	    break;

	case WORKER_CTL_PROCTBL_END:
	    done = TRUE;
	    break;

	default:
	    sr_fatal_error("Invalid type in receive_proc_table");
	    break;
	}
    }
}

/* Wait for
 *   - other nodes to connect to me
 *   - host to tell me to make connections
 *     (in which case I just return and my caller calls make_connections)
 */
static void await_connections()
{
    fd_set read_fds;
    bool_t done;
    int n, fd, node;

    for (done = FALSE; !done; )
    {
	FD_ZERO(&read_fds);
	set_mask_for_all_workers(&read_fds);
	FD_SET(listen_fd, &read_fds);
	FD_SET(proc_table[_p_host_id].fd, &read_fds);

	n = select(NUM_FDS, &read_fds, (fd_set *) NULL, (fd_set *) NULL,
		   (struct timeval *) NULL);

	if (n == 0)
	    continue;

	if (n < 0)
	{
	    char buf[300];

	    if (errno == EINTR)
		continue;
	    
	    sprintf(buf, "Fatal error in select: %s\n", sys_errlist[errno]);
	    sr_fatal_error(buf);
	}
	
	for (fd = 0; fd < NUM_FDS && n > 0; fd++)
	{
	    if (FD_ISSET(fd, &read_fds))
	    {
		n--;
		
		if (fd == listen_fd)
		    process_connection_from_worker();
		else
		{
		    node = node_from_fd(fd);
		    if (node == -1)
		    {
			sr_fatal_error_1("await_connections: can't find node for fd %d\n", fd);
		    }
			 
		    if (node == _p_host_id)
			done = process_mesg_from_host();
		    else
			process_mesg_from_node(node);
		}
	    }
	}
    }
}

static void process_connection_from_worker()
{
    int worker_fd;
    worker_ctl_msg_t msg;

    worker_fd = net_accept(listen_fd);
    if (worker_fd < 0)
	sr_fatal_error("process_connection_from_worker: accept failed");

    set_nonblocking(worker_fd);

    msg.type = WORKER_CTL_CONN_TEST;
    msg.i1 = _p_my_id;
    blocking_write_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
			     "process_connection_from_worker");

    blocking_read_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
			    "process_connection_from_worker");

    if (msg.type != WORKER_CTL_CONN_ACK)
	sr_fatal_error("process_connection_from_worker: wrong response from worker");

    if (msg.i2 != _p_my_id)
    {
	char buf[300];
	sprintf(buf, "process_connection_from_worker: node mismatch: me=%lu i1=%d i2=%d\n",
		(unsigned long) _p_my_id, msg.i1, msg.i2);
	sr_fatal_error(buf);
    }

    proc_table[msg.i1].fd = worker_fd;
}

/* Called from await_connections when host sends message.
 *
 * Return TRUE if await_connections should return.
 */
static bool_t process_mesg_from_host()
{
    bool_t rc = FALSE;
    worker_ctl_msg_t msg;
    char buf[300];

    
    blocking_read_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
			    "process_mesg_from_host");

    switch(msg.type)
    {
    case WORKER_CTL_START_CONNECTING:
	rc = TRUE;
	break;

    default:
	sprintf(buf,
		"process_mesg_from_host: don't know how to handle message type %ld",
		(long) msg.type);
	sr_fatal_error(buf);
	break;
    }
    return rc;
}

static void process_mesg_from_node(node)
int node;
{
    char buf[300];
    int fd;
    worker_ctl_msg_t msg;

    fd = proc_table[node].fd;

    blocking_read_sr_errchk(fd, (char *) &msg, sizeof(msg), "process_mesg_from_node");

    switch (msg.type)
    {
    default:
	sprintf(buf,
		"process_mesg_from_node: don't know how to handle message type %ld",
		(long) msg.type);
	sr_fatal_error(buf);
	break;
    }
}	

static void make_connections()
{
    int node;
    int conn_fd;
    worker_ctl_msg_t msg;
    char buf[300];

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) making connections\n", ident());
#endif
	

    for (node = _p_my_id + 1; node < _p_nodes; node++)
    {
	conn_fd = net_conn_to_listener(proc_table[node].host,
				       proc_table[node].port);

	if (conn_fd < 0)
	    sr_fatal_error("make_connections: connect failed");

	set_nonblocking(conn_fd);

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) connected to fd %d\n", ident(), conn_fd);
#endif
	
	blocking_read_errchk(conn_fd, (char *) &msg, sizeof(msg), "make_connections");

	if (msg.type != WORKER_CTL_CONN_TEST)
	{
	    sprintf(buf, "make_connections: received mesg type %d, wanted %d",
		    msg.type, WORKER_CTL_CONN_TEST);
	    sr_fatal_error(buf);
	}

	if (node != msg.i1)
	{
	    sprintf(buf, "make_connections: node mismatch: node=%d msg.i1=%d",
		    node, msg.i1);
	    sr_fatal_error(buf);
	}
	    
	msg.type = WORKER_CTL_CONN_ACK;
	msg.i1 = _p_my_id;
	msg.i2 = node;

#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) got node %d\n", ident(), node);
#endif

	blocking_write_errchk(conn_fd, (char *) &msg, sizeof(msg), "make_connections");

	proc_table[node].fd = conn_fd;
    }
#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) done making connections, sending done\n", ident());
#endif
    msg.type = WORKER_CTL_CONNECTIONS_MADE;
    blocking_write_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
			     "make_connections");
#ifdef DEBUG
    if (network_debugging)
	fprintf(_p_stdout, "(%s) done making connections, sent\n", ident());
#endif
}

	
static void set_mask_for_all_workers(mask)
fd_set *mask;
{
    proc_table_entry *ent;
    int i, fd;

    if (destroying_nodes)
	return;

    for (i = 1; i < _p_nodes; i++)
    {
	ent = &(proc_table[i]);
	fd = ent->fd;
	if (fd != -1 && i != _p_my_id &&
	    !(ent->buf != NULL && ent->len == 0))
	{
	    FD_SET(fd, mask);
	}
    }
}

#ifdef DEBUG
static void dump_proc_table()
{
    int_t i;
    proc_table_entry *ent;

    printf("%lu: Proc table: _p_nodes=%lu\n", (unsigned long) ME,
	   (unsigned long) _p_nodes);
    for (i = 0; i < _p_nodes; i++)
    {
	ent = &proc_table[i];
	printf("%lu: entry %lu: port=%d fd=%d host=%s buf=%lx bufend=%lx len=%d\n",
	       (unsigned long) ME, (unsigned long) i,
	       ent->port, ent->fd, ent->host,
	       (unsigned long) ent->buf, (unsigned long) ent->bufend,
	       ent->len);
    }
}
#endif /* DEBUG */

static int node_from_fd(fd)
int fd;
{
    int i;

    for (i = 0;i < _p_nodes; i++)
	if (proc_table[i].fd == fd)
	    return i;
#ifdef DEBUG
    /*
    fprintf(_p_stdout, "(%s) node_from_fd(%d) failed\n", ident(), fd);
    dump_proc_table();
    */
#endif
    return -1;
}

static void set_nonblocking(fd)
int fd;
{
#ifndef hpux
    int flags;
    
    if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
    {
	perror("fcntl F_GETFL 1");
	exit(1);
    }

    flags |= FNDELAY;

    if (fcntl(fd, F_SETFL, flags) < 0)
    {
	perror("fcntl F_SETFL");
	exit(1);
    }
#else
    int one = 1;
    
    if (ioctl(fd, FIONBIO, &one) < 0)
    {
	perror("ioctl FIONBIO");
	exit(1);
    }
#endif

#ifdef F_SETFD
    if (fcntl(fd, F_SETFD, 1) < 0)
    {
	perror("fcntl F_SETFD");
        exit(1);
    }
#endif
}

struct FourInts
{
    int i1, i2, i3, i4, i5;
};

static int blocking_read(fd, buf, size)
int fd;
char *buf;
int size;
{
    int n, n_left;
    char *msgptr;

    for (n_left = size, msgptr = buf; n_left > 0; )
    {
	n = read(fd, msgptr, n_left);
	if (n == 0)
	{
#ifdef DEBUG
	    if (network_debugging)
		printf("(%s) read from %d got eof\n", ident(),
		       node_from_fd(fd));
#endif
	    return 0;
	}
	if (n < 0)
	{
#ifdef DEBUGFOO
	    if (network_debugging)
		fprintf(_p_stdout, "(%s) read from fd %d node %d returned error: %s\n",
			ident(), fd, node_from_fd(fd), SYSTEM_ERROR);
#endif	    
	    if (errno == EWOULDBLOCK)
		continue;
	    else if (errno == EPIPE)
		return 0;
	    else
		return n;
	}
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) read %d/%d bytes from node %d\n",
		    ident(), n, n_left, node_from_fd(fd));
#endif
	n_left -= n;
	msgptr += n;
    }
#ifdef DEBUG    
{
    struct FourInts *fi = (struct FourInts *) buf;
    if (network_debugging)
	fprintf(_p_stdout, "(%s) read got header %d %d %d %d %d\n",
		ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5);
}
#endif
    return size;
}

static int blocking_write(fd, buf, size)
int fd;
char *buf;
int size;
{
    int n, n_left;
    char *msgptr;
#ifdef DEBUG    
{
    struct FourInts *fi = (struct FourInts *) buf;
    if (network_debugging)
	fprintf(_p_stdout, "(%s) write putting header %d %d %d %d %d\n",
		ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5);
}
#endif

    for (n_left = size, msgptr = buf; n_left > 0; )
    {
	n = write(fd, msgptr, n_left);
	/*
	if (n == 0)
	{
	    printf("write got eof\n");
	    return 0;
	}
	*/
	if (n < 0)
	{
	    if (errno == EWOULDBLOCK)
		continue;
	    else if (errno == EPIPE)
		return 0;
	    else
		return n;
	}
#ifdef DEBUG
	if (network_debugging)
	    fprintf(_p_stdout, "(%s) wrote %d/%d bytes to node %d\n",
		    ident(), n, n_left, node_from_fd(fd));
#endif
	n_left -= n;
	msgptr += n;
    }
#ifdef DEBUG    
{
    struct FourInts *fi = (struct FourInts *) buf;
    if (network_debugging)
	fprintf(_p_stdout, "(%s) write got header %d %d %d %d %d\n",
		ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5);
}
#endif
    return size;
}

static void blocking_write_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
    int rc = blocking_write(fd, buf, size);
    static char msgbuf[1024];
    
    if (rc < 0)
    {
	sprintf(msgbuf, "%s: write failed: %s", func, SYSTEM_ERROR);
	_p_fatal_error(msgbuf);
    }
    else if (rc == 0)
    {
	sprintf(msgbuf, "%s: eof on write\n", func);
	_p_fatal_error(msgbuf);
    }
}

static void blocking_read_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
    int rc = blocking_read(fd, buf, size);
    static char msgbuf[1024];
    
    if (rc < 0)
    {
	sprintf(msgbuf, "%s: read failed: %s", func, SYSTEM_ERROR);
	_p_fatal_error(msgbuf);
    }
    else if (rc == 0)
    {
	sprintf(msgbuf, "%s: eof on read -- some other node probably aborted\n", func);
	_p_fatal_error(msgbuf);
    }
}

static void blocking_write_sr_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
    int rc = blocking_write(fd, buf, size);
    static char msgbuf[1024];
    
    if (rc < 0)
    {
	sprintf(msgbuf, "%s: write failed: %s", func, SYSTEM_ERROR);
	sr_fatal_error(msgbuf);
    }
    else if (rc == 0)
    {
	sprintf(msgbuf, "%s: eof on write\n", func);
	sr_fatal_error(msgbuf);
    }
}

static void blocking_read_sr_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
    int rc = blocking_read(fd, buf, size);
    static char msgbuf[1024];
    
    if (rc < 0)
    {
	sprintf(msgbuf, "%s: read failed: %s", func, SYSTEM_ERROR);
	sr_fatal_error(msgbuf);
    }
    else if (rc == 0)
    {
	sprintf(msgbuf, "%s: eof on read\n", func);
	sr_fatal_error(msgbuf);
    }
}

static void net_setup_anon_listener(backlog, port, skt)	
int backlog;
int *port;
int *skt;
{
    int rc,sinlen;
    struct sockaddr_in sin;
    int optval = TRUE;

    *skt = socket(AF_INET, SOCK_STREAM, 0);
    error_check(*skt,"net_setup_anon_listener socket");

    rc = setsockopt(*skt,IPPROTO_TCP,TCP_NODELAY,(char *)&optval,sizeof(optval));
    if (rc < 0)
    {
	sr_fatal_error_1("setsockopt failed: %s", SYSTEM_ERROR);
    }

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = INADDR_ANY;
    sin.sin_port = htons(0);

    sinlen = sizeof(sin);

    error_check(bind(*skt,(struct sockaddr *) &sin,sizeof(sin)),
		   "net_setup_anon_listener bind");


    error_check(listen(*skt, backlog), "net_setup_anon_listener listen");

    getsockname(*skt, (struct sockaddr *) &sin, &sinlen);
    *port = ntohs(sin.sin_port);
}

/*
  Accept a connection on socket skt and return fd of new connection.
 */
static int net_accept(skt)	
int skt;
{
struct sockaddr_in from;
int fromlen;
int skt2;
int gotit;

    fromlen = sizeof(from);
    gotit = 0;
    while (!gotit)
    {
	skt2 = accept(skt, (struct sockaddr *) &from, &fromlen);
	if (skt2 == -1)
	{
	    if (errno == EINTR)
		continue;
	    else
		error_check(skt2, "net_accept accept");
	}
	else
	    gotit = 1;
    }

    return(skt2);
}

static int net_conn_to_listener(hostname, port)	
char *hostname;
int port;
{
    int rc,s;
    struct sockaddr_in listener;
    struct hostent *hp;
    int optval = TRUE;

    hp = gethostbyname(hostname);
    if (hp == NULL)
    {
	sr_fatal_error_1("connect_to_listener: gethostbyname %s failed",
			 hostname);
    }

    ZeroOutMemory((char *) &listener, sizeof(listener));
    memcpy((char *) &listener.sin_addr, (char *) hp->h_addr, hp->h_length);
    listener.sin_family = hp->h_addrtype;
    listener.sin_port = htons(port);

    s = socket(AF_INET, SOCK_STREAM, 0);
    error_check(s, "net_conn_to_listener socket");
    error_check(setsockopt(s,IPPROTO_TCP,TCP_NODELAY,
			   (char *) &optval, sizeof(optval)),
		"net_conn_to_listener setsockopt");

    rc = connect(s,(struct sockaddr *) &listener, sizeof(listener));
    error_check(rc, "net_conn_to_listener connect");

    return(s);
}

static void error_check(val, str)	
int val;
char *str;
{
    char buf[200];
    if (val < 0)
    {
	sprintf(buf, "%s :%d: %s\n", str, val, sys_errlist[errno]);
	sr_fatal_error(buf);
    }
}

#ifdef DEBUG

static char *ident()
{
    static char buf[100];
    static char host[128];
    static int gotHost = 0;
    static int pid;

    if (!gotHost)
    {
	char *s;
	gethostname(host, sizeof(host));
	if ((s = strchr(host, '.')) !=NULL)
	    *s = 0;
	pid = getpid();
	gotHost = 1;
    }

    sprintf(buf, "%lu-%s-%d", (unsigned long) _p_my_id, host, pid);
    return buf;
}

#endif /* DEBUG */


#ifdef STREAMS

/*
 * void init_stream_table()
 *
 * Initialize the stream array by setting all the pointers to NULL.
 */
static void init_stream_table()
{
    int_t i;
    for (i = 0; i < max_streams; i++)
    {
	stream_table[i].stream_array = (cell_t *) NULL;
	stream_table[i].sindex = i + 1;
	stream_table[i].enabled = FALSE;
	stream_table[i].mq_front = (msg_buf_header_t *) NULL;
	stream_table[i].mq_back  = (msg_buf_header_t *) NULL;
    }
    first_free_stream_id = 0;
    stream_table[max_streams-1].sindex = -1;
}


/*
 * bool_t _p_alloc_stream(cell_t *stream_array, int_t sindex, int_t *id)
 *
 * Grab the first free table entry, and fill it in with the passed
 * stream pointer to the header of the stream array (stream_array) and the
 * index into the stream array (sindex).
 *
 * Return:	TRUE if a stream was succesfully allocated, FALSE otherwise
 *	  *id = the stream id (and integer)
 */
bool_t _p_alloc_stream(stream_array, sindex, id)
cell_t *stream_array;
int_t sindex;
int_t *id;
{
    *id = first_free_stream_id;
    if (first_free_stream_id == -1)
    {
	return (FALSE);
    }
    else
    {
	stream_table[*id].stream_array = stream_array;
	stream_table[*id].sindex = sindex;
	first_free_stream_id = stream_table[first_free_stream_id].sindex;
	return (TRUE);
    }
} /* _p_alloc_stream() */


/*
 * void _p_free_stream(int_t id)
 *
 * Free the stream with the passed stream id.
 */
void _p_free_stream(id)
int_t id;
{
    if (id < 0 || id >= max_streams)
	_p_fatal_error("_p_free_stream(): Invalid stream id");
    
#ifdef DEBUG
    if (stream_table[id].stream_array == (cell_t *) NULL)
    {
	fprintf(_p_stdout, "(%lu,%lu) Warning: _p_free_stream(): Freeing a NULL stream: %ld\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(long) id);
    }
#endif /* DEBUG */    

    stream_table[id].stream_array = (cell_t *) NULL;
    stream_table[id].sindex = first_free_stream_id;
    first_free_stream_id = id;
} /* _p_free_stream() */


/*
 * void _p_update_stream(int_t id, cell_t *stream_array)
 *
 * Update the stream_table entry for id so that it points to the
 * passed stream array (stream_array).  This procedure is used by the
 * garbage collector to update the stream array location after it
 * moves the stream array on the heap during garbage collection.
 */
void _p_update_stream(id, stream_array)
int_t id;
cell_t *stream_array;
{
    if (id < 0 || id >= max_streams)
	_p_fatal_error("_p_update_stream(): Invalid stream id");
    
#ifdef DEBUG
    if (stream_table[id].stream_array == (cell_t *) NULL)
    {
	fprintf(_p_stdout, "(%lu,%lu) Warning: _p_update_stream(): Updating a NULL stream: %ld\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(long) id);
    }
#endif /* DEBUG */    

    stream_table[id].stream_array = stream_array;
} /* _p_update_stream() */


#ifdef DEBUG
#define ValidateStreamData(ID) \
{ \
    if((ID) < 0 || (ID) >= max_streams) \
	_p_fatal_error("_p_lookup_stream*(): Invalid stream id"); \
    if(stream_table[(ID)].stream_array == (cell_t *) NULL) \
	_p_fatal_error("_p_lookup_stream*(): Could not find stream"); \
    if(((data_header_t *) (stream_table[(ID)].stream_array))->tag != STREAM_TAG) \
	_p_fatal_error("_p_lookup_stream*(): Expected a STREAM_TAG"); \
}
#else  /* DEBUG */
#define ValidateStreamData()
#endif /* DEBUG */

/*
 * stream_t *_p_lookup_stream(int_t id);
 *
 * Lookup the stream id in the stream_table.  Return the location on the
 * heap of the stream_t data structure that correspondes to the appropriate
 * index within the stream array.
 *
 * NOTE: The return value will not point to a tagged structure.  It
 * will point to the actual stream structure.
 */
stream_t *_p_lookup_stream(id)
int_t id;
{
    ValidateStreamData(id);
    return ((stream_t *) (stream_table[id].stream_array
			  + stream_table[id].sindex)     );
} /* _p_lookup_stream() */


/*
 * cell_t *_p_lookup_stream_header(int_t id);
 *
 * Lookup the stream id in the stream_table.  Return the location on the
 * heap of the header for the stream array that has this stream in it.
 */
cell_t *_p_lookup_stream_header(id)
int_t id;
{
    ValidateStreamData(id);
    return (stream_table[id].stream_array);
} /* _p_lookup_stream() */


/*
 * int_t _p_lookup_stream_index(int_t id);
 *
 * Lookup the stream id in the stream_table.  Return the index of this
 * stream in its stream array.
 */
int_t _p_lookup_stream_index(id)
int_t id;
{
    ValidateStreamData(id);
    return (stream_table[id].sindex);
} /* _p_lookup_stream() */


/*
 * bool_t _p_first_stream(int_t *id)
 *
 * This procedure is used in conjunction with _p_next_stream() to
 * iterate through all of the streams.  They can be used as follows:
 *
 *	bool_t cont_iter;
 *	int_t id;
 *	for (cont_iter = _p_first_stream(&id);
 *	     cont_iter;
 *	     cont_iter = _p_next_stream(&id)
 *	    )
 *	{
 *		Do_Whatever();
 *	}
 *
 * Return:	TRUE if there are any more streams, FALSE otherwise
 *	  *id = the first stream id
 */
bool_t _p_first_stream(id)
int_t *id;
{
    *id = -1;
    return (_p_next_stream(id));
} /* _p_first_stream() */


/*
 * bool_t _p_next_stream(int_t *id)
 *
 * This procedure is used in conjunction with _p_first_stream() to
 * iterate through all of the streams.  It returns (in *id) the next
 * stream after *id.  (i.e. It modifies the value of *id.)
 *
 * Return:	TRUE if there are any more streams, FALSE otherwise
 *	  *id = the next stream id after the one passed in *id
 */
bool_t _p_next_stream(id)
int_t *id;
{
    if (*id < -1 || *id >= max_streams)
	_p_fatal_error("_p_next_stream(): Invalid stream id");

    for ((*id)++;
	 ((*id < max_streams)
	  && (stream_table[*id].stream_array == (cell_t *) NULL));
	 (*id)++ )
	;
    if (*id >= max_streams)
    {
	*id = -1;
	return (FALSE);
    }
    else
    {
	return (TRUE);
    }
} /* _p_next_stream() */


/*
 * void _p_enable_stream_recv(int_t id)
 *
 * Enable remote stream receives on the passed stream 'id'.
 *
 * Optional: If there is a pending remote stream message for this
 * stream, then this procedure can go ahead and deliver it.
 */
void _p_enable_stream_recv(id)
int_t id;
{
    ValidateStreamData(id);

    if (stream_table[id].mq_front != (msg_buf_header_t *) NULL)
    {
	/* A message is available on this stream, so deliver it */
	msg_buf_header_t *msg;
	msg = stream_table[id].mq_front;
	stream_table[id].mq_front = msg->next;
	deliver_stream_msg(msg);
	FreeMsgBuf(msg);
    }
    else if (stream_table[id].closed)
    {
	/*
	 * There are no messages left to be delivered, and the stream
	 * has been closed by the sender.  So close the receiving side.
	 */
	_p_close_stream(id);
    }
    else
    {
	stream_table[id].enabled = TRUE;
    }
} /* _p_enable_stream_recv() */


/*
 * void _p_disable_stream_recv(int_t id)
 *
 * Disable remote stream receives on the passed stream 'id'.
 *
 * Optional: If there is a pending remote stream message for this
 * stream, then this procedure can go ahead and deliver it.
 */
void _p_disable_stream_recv(id)
int_t id;
{
    ValidateStreamData(id);
    stream_table[id].enabled = FALSE;
} /* _p_disable_stream_recv() */


/*
 * void _p_send_close_stream(int_t node, int_t id)
 *
 * Close down stream <node,id>.  This procedure is called from the
 * sending side of the stream.
 *
 * Care must be taken to ensure that the stream is not closed on the
 * receiving side until all messages on this stream have been delivered.
 * Thus, this stream_close message needs to remain ordered relative to
 * other sends on this stream.
 */
void _p_send_close_stream(node, id)
int_t node;
int_t id;
{
    cell_t *msg_buf;
    
#ifdef DEBUG
    if (ParDebug(5))
    {
	fprintf(_p_stdout,
		"(%lu,%lu) _p_send_close_stream(): to node %lu, id %ld\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(unsigned long) node, (long) id);
	fflush(_p_stdout);
    }
#endif /* DEBUG */
    
    msg_buf = _p_alloc_msg_buffer(1);
    *((int_t *) msg_buf) = id;
    _p_msg_send(msg_buf, node, 1, MSG_CLOSE_STREAM);
    
#ifdef GAUGE    
    _p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */    
} /* _p_send_close_stream() */


/*
 * void _p_stream_send(int_t id, int_t node,
 *		       cell_t *array, u_int_t offset, u_int_t size)
 *
 * Send a stream message it stream id, 'id', on node 'node'.  The
 * message should come from the passed 'array', starting 'offset'
 * elements into the array, 'size' elements in size.
 */
void _p_stream_send(id, array, offset, size)
int_t id;
cell_t *array;
u_int_t offset;
u_int_t size;
{
    /*
     * Allocate a message buffer of the appropriate size, set up the
     * the header, copy in the data, and deliver it.
     */
}


/*
 * void handle_close_stream(msg_buf_header_t *msg)
 *
 * When _p_msg_receive() receives a MSG_CLOSE_STREAM message it
 * calls this procedure to handle it.  This procedure should figure
 * out which stream id this message is for, mark the closed flag in
 * the stream_table, and deliver the close stream (call _p_close_stream())
 * if there are no messages pending delivery.
 */
static void handle_close_stream(msg)
msg_buf_header_t *msg;
{
    int id;

    id = *((int_t *) (msg->user_buf));
    ValidateStreamData(id);
    
#ifdef DEBUG
    if (ParDebug(5))
    {
	fprintf(_p_stdout, "(%lu,%lu) handle_close_stream(): From node %lu, id %ld\n",
		(unsigned long) _p_my_id, (unsigned long) _p_reduction,
		(unsigned long) msg->node, (long) id);
    }
#endif /* DEBUG */

    stream_table[id].closed = TRUE;
    if (stream_table[id].mq_front == (msg_buf_header_t *) NULL)
	_p_close_stream(id);
    
#ifdef GAUGE	    
    _p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
} /* handle_close_stream() */


/*
 * void handle_stream_receive(msg_buf_header_t *msg)
 *
 * When _p_msg_receive() receives a stream message it calls this procedure
 * to handle it.  This procedure should figure out which stream id
 * this message is for, and do the following:
 *	- If receives are enabled on this id, then deliver the message.
 *	- If recevies are not enabled on this id, then queue the message up.
 */
static void handle_stream_receive(msg)
msg_buf_header_t *msg;
{
} /* handle_stream_receive() */


/*
 * void deliver_stream_msg(msg_buf_header_t *msg)
 *
 * When a message is ready for delivery (there is a pending receive
 * and received data) then this procedure is called.  It should find
 * the stream data structure on the heap and copy the contents of
 * the message buffer as appropriate.
 */
static void deliver_stream_msg(msg)
msg_buf_header_t *msg;
{
} /* deliver_stream_receive() */


#endif /* STREAMS */
 

These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.