This is sr_bsdipc.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster and Robert Olson
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* sr_bsdipc.c - send/receive header using BSD Unix IPC (TCP sockets)
*/
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/wait.h>
#include <netinet/tcp.h>
#include <sys/resource.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <sys/param.h>
#ifdef rs6000
#include <sys/select.h>
#endif
#include "pcn.h"
/*
* We need to determine if the Unix we're on uses "struct direct" or
* "struct dirent" for directory entries. So far, the NeXT is the only
* one that uses "struct direct".
*/
#if defined(next_os) || defined(symmetry)
#define USE_DIRECT
#else
#define USE_DIRENT
#endif
#if defined(USE_DIRECT)
#include <sys/dir.h>
#else
#if defined(USE_DIRENT)
#include <dirent.h>
#endif
#endif
#include <sys/file.h>
/*
* For the access() arguments
*/
#if !(defined(sequent) || defined(next_os))
#include <unistd.h>
#endif
/*
* The OPTIMAL_TCP_BLOCKSIZE is the block size that I've found
* produces the highest throughput (probably not incidentally, this is
* the number of bytes written at each write() to a nonblocking
* socket).
*
* TCP (on sparcstations at least) seems to have a problem with
* sending block sizes in range from 4097 to about 6000 bytes.
* Defining TCP_BUMP_HACK (named from the bump in the plot of
* elapsed time for send versus message size) forces messages in this
* region to be forced to a larger size, actually increasing the
* performance. The endpoints of the bump are defined as TCP_BUMP_MIN
* and TCP_BUMP_MAX, defaulting to OPTIMAL_TCP_BLOCKSIZE and
* OPTIMAL_TCP_BLOCKSIZE + 2048.
*
* These parameters will probably have to be tuned for different
* TCP/IP implementations.
*
*/
#ifndef OPTIMAL_TCP_BLOCKSIZE
#define OPTIMAL_TCP_BLOCKSIZE 4096
#endif
#ifdef TCP_BUMP_HACK
#ifndef TCP_BUMP_MIN
#define TCP_BUMP_MIN OPTIMAL_TCP_BLOCKSIZE
#endif
#ifndef TCP_BUMP_MAX
#define TCP_BUMP_MAX (TCP_BUMP_MIN + 2048)
#endif
#endif
#ifdef hpux
/* Empirically determined... */
#define NUM_FDS 60
#else
/* This works for BSD machines */
#define NUM_FDS getdtablesize()
#endif
#define SYSTEM_ERROR (sys_errlist[errno])
#define FATAL_SYSTEM_ERROR(s) { char buf[1024]; \
sprintf(buf, s, sys_errlist[errno]); \
_p_fatal_error(buf); }
#define ME _p_my_id
extern int errno;
extern char *sys_errlist[];
extern char *getenv();
/*
* Process table:
* Contains host and port number (used during creation) for each node
* as well as the file descriptor for the connection to that node.
*
* The fd for the entry for the current process is undefined.
*
* buf is a pointer to the current incomplete message.
* len is the number of bytes left to read.
*/
typedef struct _proc_table_entry
{
int port;
int fd;
char host[256];
char *buf, *bufend;
int len;
} proc_table_entry;
static proc_table_entry proc_table[MAX_NODES];
/*
* Argument parsing stuff.
*
* The variables are labeled _p even though they are static because I
* didn't want to change them all...
*
* parallel_startup:
* name of startup file
*
* parallel_info:
* string of the form "host,port" which denotes the hostname of
* the PCN host node, and the port on which it is listening.
*
* dont_start_remotes:
* if true, do not actually execute the system() to start remote
* nodes specfied in the startup file.
*
* network_debugging:
* if true, print out copious debugging message for the low level
* send and receive.
*
* use_start_nodes:
* attempt to connect to the host-control daemon to start worker nodes.
*
* hostconfig_file:
* name of a file containing host-control configuration
* information. See the host-control documentation for more details.
*/
static char parallel_startup[MAX_PATH_LENGTH];
static char parallel_info[MAX_PATH_LENGTH];
static bool_t dont_start_remotes = 0;
#ifdef DEBUG
static bool_t network_debugging = 0;
#endif
static bool_t use_start_nodes = 0;
static char hostconfig_file[MAX_PATH_LENGTH];
static bool_t start_heterogeneous;
static char host_startup_list[1000];
static char *exe_from_argv; /* For saving argv[0] to give to host-control */
/*
* my_port is the TCP port number for the port on which the host
* listens for connections from the worker nodes.
*
* listen_fd is the associated file descriptor.
*/
static int my_port;
static int listen_fd;
static bool_t destroying_nodes = FALSE;
typedef struct msg_buf_header_struct
{
struct msg_buf_header_struct * next;
#ifdef TCP_BUMP_HACK
int_t alloc_size;
#endif
int_t size;
int_t type;
int_t node;
cell_t user_buf[1];
} msg_buf_header_t;
#ifdef TCP_BUMP_HACK
#define HEADER_SIZE 5
#else
#define HEADER_SIZE 4
#endif
#define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE)
#define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \
- HEADER_SIZE))
#define MsgBuf2User(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \
+ HEADER_SIZE))
static msg_buf_header_t *mq_front, *mq_back;
#ifdef STREAMS
/*
* Each element of the stream_table contains a pointer to the front
* of a stream array on the heap, and the index of this table entry
* into that stream array.
*
* The sindex field of the stream_table is also used to maintain a
* free list of stream_table entries, where first_free_stream_id
* contains the index of the first free stream table entry.
*
* When there is a pending receive on a stream, the 'enabled' field is
* set to TRUE. Whenever a stream message arrives, _p_msg_receive()
* needs to check this field to see if it can deliver the message.
* If enabled==FALSE, then the message is queued on this stream's
* message queue (mq_front, mq_back).
*
* A receiving side of a stream is marked closed when:
* - closed==TRUE
* - and, the message queue is empty
*/
static struct
{
cell_t * stream_array;
int_t sindex;
bool_t enabled;
bool_t closed;
msg_buf_header_t * mq_front;
msg_buf_header_t * mq_back;
} stream_table[MAX_STREAMS];
static int_t max_streams = MAX_STREAMS;
static int_t first_free_stream_id;
static void handle_close_stream();
static void handle_stream_receive();
static void deliver_stream_msg();
#endif /* STREAMS */
/*
* A worker_ctl_msg_t is sent between the host and workers during
* system initialization.
*
* Message types:
*
* WORKER_CTL_INIT:
* i1 = node id
* i2 = number of nodes
* Sent from host to worker
*
* WORKER_CTL_PORT
* i1 = port number on which worker is listening
* buf = hostname of worker
* Sent from worker to host
*
* WORKER_CTL_PROCTBL_ENTRY
* A process table entry.
* i1 = node number
* i2 = port on which node i1 is listening
* buf = hostname for node i1
*
* WORKER_CTL_PROCTBL_END
* Denotes the end of the process table.
*
* WORKER_CTL_START_CONNECTING
* Sent from the host to a worker to instruct the worker to
* connect to all other workers with node numbers greater than
* its own.
*
* WORKER_CTL_CONNECTIONS_MADE
* Sent from a worker to the host when it has finished making its
* connections.
*
* WORKER_CTL_CONN_TEST
* This is the first message sent on a new connection. It is used
* to test the connection.
*
* WORKER_CTL_CONN_ACK
* Response to a WORKER_CTL_CONN_TEST message.
*
* WORKER_CTL_GO
* Sent from the host to a worker to instruct the worker to begin
* executing PCN.
*
* WORKER_CTL_GO_ACK
* Response to a WORKER_CTL_GO message.
*
* WORKER_CTL_CHECKIN
* Sent from the host to a worker during the final node checking
* in _p_sr_node_initialized().
*
* WORKER_CTL_CHECKIN_ACK
* Response to a WORKER_CTL_CHECKIN message.
*
* WORKER_CTL_EXEC_FAILED
* Sent by host-control when it discovers that a node couldn't be
* started.
*/
typedef struct worker_ctl_msg_struct
{
int type;
int i1, i2, i3;
char buf[256];
} worker_ctl_msg_t;
#define WORKER_CTL_INIT 1
#define WORKER_CTL_PORT 2
#define WORKER_CTL_PROCTBL_ENTRY 3
#define WORKER_CTL_PROCTBL_END 4
#define WORKER_CTL_START_CONNECTING 5
#define WORKER_CTL_CONNECTIONS_MADE 6
#define WORKER_CTL_CONN_TEST 7
#define WORKER_CTL_CONN_ACK 8
#define WORKER_CTL_GO 9
#define WORKER_CTL_GO_ACK 10
#define WORKER_CTL_CHECKIN 11
#define WORKER_CTL_CHECKIN_ACK 12
#define WORKER_CTL_EXEC_FAILED 13
static void net_setup_anon_listener();
static int net_accept() ;
static int net_conn_to_listener();
static void error_check();
static int blocking_read();
static int blocking_write();
static void blocking_read_sr_errchk();
static void blocking_write_sr_errchk();
static void blocking_read_errchk();
static void blocking_write_errchk();
static void set_nonblocking();
static void sr_fatal_error();
static void sr_fatal_error_1();
static void start_child();
static void start_nodes();
static int start_nodes_with_daemon();
static int start_nodes_with_script();
static void start_nodes_from_list();
static void start_node_on_host();
static void read_line_from_fd();
static void write_line_to_fd();
static void find_host_daemon();
static void read_startup_file();
static void start_host();
static void process_connection();
static void start_worker();
static void set_mask_for_all_workers();
#ifdef DEBUG
static void dump_proc_table();
#endif /* DEBUG */
static void accept_connections();
static void send_proc_table();
static void receive_proc_table();
static void await_connections();
static void make_connections();
static void process_connection_from_worker();
static void process_mesg_from_node();
static bool_t process_mesg_from_host();
static int node_from_fd();
static void await_go();
static char *ident();
/*
* _p_sr_get_argdesc()
*/
void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp)
int argc;
char **argv;
argdesc_t **argdescp;
int *n_argdescp;
{
static argdesc_t argdesc[] = {
STRING_ARG("nodes", host_startup_list,
"List of machine names on which to run PCN nodes"),
STRING_ARG("s", parallel_startup,
"system specific parallel startup file"),
BOOL_ARG("start-nodes", &use_start_nodes,
"try to use the host-control daemon to start nodes"),
INTEGER_ARG("n", &_p_nodes,
"number of nodes"),
STRING_ARG("pi", parallel_info,
"parallel info for node processes -- you should never use this"),
STRING_ARG("hostconfig", hostconfig_file,
"host configuration file"),
BOOL_ARG("nostart", &dont_start_remotes,
"don't actually start remote nodes"),
BOOL_ARG("het", &start_heterogeneous,
"allow a heterogeneous network"),
#ifdef DEBUG
BOOL_ARG("netdebug", &network_debugging,
"turn on network debugging\n"),
#endif
};
exe_from_argv = argv[0];
parallel_startup[0] = '\0';
parallel_info[0] = '\0';
dont_start_remotes = FALSE;
#ifdef DEBUG
network_debugging = FALSE;
#endif
use_start_nodes = FALSE;
start_heterogeneous = FALSE;
hostconfig_file[0] = '\0';
host_startup_list[0] = '\0';
*argdescp = argdesc;
*n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]);
}
/*
* _p_alloc_msg_buffer()
*
* There is some trickery here to support the TCP_BUMP_HACK. If the
* message size falls in the bump region (TCP_BUMP_MIN < size < TCP_BUMP_MAX)
* then actually allocate TCP_BUMP_MAX bytes and fill in the allocated
* size in msg_buf->alloc_size. Otherwise let msg_buf->alloc_size = -1.
*
* Return: A pointer to a message buffer with 'size' cells.
*/
cell_t *_p_alloc_msg_buffer(size)
int_t size; /* In cells */
{
int_t i;
msg_buf_header_t *msg_buf;
#ifdef TCP_BUMP_HACK
bool_t resize = FALSE;
#endif
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_alloc_msg_buffer: Allocating %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
i = MsgSize2Bytes(size);
#ifdef TCP_BUMP_HACK
if (i > TCP_BUMP_MIN && i < TCP_BUMP_MAX)
{
/* Here's the bad area. Allocate TCP_BUMP_MAX bytes instead (and
* actually send them!
*/
i = TCP_BUMP_MAX;
resize = TRUE;
}
#endif
if ((msg_buf = (msg_buf_header_t *) malloc((size_t) i)) == NULL)
_p_malloc_error();
#ifdef TCP_BUMP_HACK
msg_buf->alloc_size = resize ? i : -1;
#endif
msg_buf->size = size;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_alloc_msg_buffer: Allocated %lu cells\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) size);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (msg_buf->user_buf);
} /* _p_alloc_msg_buffer() */
#define FreeMsgBuf(Buf) free((char *) Buf)
/*
* _p_abort_nodes()
*/
void _p_abort_nodes()
{
int_t i;
for (i = 0; i < _p_nodes; i++)
{
if (i != _p_my_id)
{
close(proc_table[i].fd);
}
}
} /* _p_abort_nodes() */
static bool_t recv_one_mesg(rcv_buf, blocking)
msg_buf_header_t **rcv_buf;
bool_t blocking;
{
fd_set read_fds;
struct timeval tval, *tvalptr;
int n, i, n_read;
bool_t done, avail;
static int next_proc = 0;
proc_table_entry *ent;
#ifdef DEBUG
if ((network_debugging && blocking) || (network_debugging > 1))
printf("%lu: recv_one_mesg blocking=%lu\n",
(unsigned long) ME, (unsigned long) blocking);
#endif
/*
* First, check if there are any messages sitting in the table
* already. If there are, pull one of them out and return it. The
* stuff waiting on the streams can wait.
*/
avail = FALSE;
for (i = 0; i < _p_nodes; i++)
{
ent = &(proc_table[i]);
if (ent->buf != (char *) NULL && ent->len == 0)
{
avail = TRUE;
break;
}
}
done = avail;
while (!done)
{
if (!blocking)
{
tval.tv_sec = 0;
tval.tv_usec = 0;
tvalptr= &tval;
}
else
tvalptr = NULL;
FD_ZERO(&read_fds);
set_mask_for_all_workers(&read_fds);
if (!_p_host && (proc_table[_p_host_id].fd != -1))
FD_SET(proc_table[_p_host_id].fd, &read_fds);
#ifdef DEBUG
if (network_debugging > 1)
{
char buf[1024], buf2[100];
buf[0] = 0;
for (i = 0; i < howmany(FD_SETSIZE, NFDBITS); i++)
{
sprintf(buf2, "%0lx ", (unsigned long) read_fds.fds_bits[i]);
strcat(buf, buf2);
}
printf("%lu: selecting mask=%s\n", ME, buf);
}
#endif
n = select(NUM_FDS, &read_fds, (fd_set *) NULL,
(fd_set *) NULL, tvalptr);
#ifdef DEBUG
if (network_debugging > 1)
printf("%lu: select returns %d\n", ME, n);
#endif
if (n < 0)
{
char buf[300];
if (errno == EINTR)
continue;
sprintf(buf, "recv_one_mesg: select failed: %s",
sys_errlist[errno]);
_p_fatal_error(buf);
}
/*
* Now we need to read something from each fd that has data
* for reading. If something is available on a fd that isn't
* already listed as having a read in progress, then there
* must be a message header there.
*/
avail = FALSE;
for (i = 0; i < _p_nodes; i++)
{
ent = &(proc_table[i]);
if (ent->fd < 0)
continue;
if (FD_ISSET(ent->fd, &read_fds))
{
#ifdef DEBUG
if (network_debugging)
printf("%lu: fd is set: node=%d fd=%d\n", _p_my_id, i, ent->fd);
#endif
if (ent->buf == (char *) NULL)
{
msg_buf_header_t hdr;
int len;
#ifdef DEBUG
if (network_debugging)
printf("%lu: reading header\n", _p_my_id);
#endif
blocking_read_errchk(ent->fd, (char *) &hdr,
HEADER_SIZE * CELL_SIZE,
"recv_one_mesg");
#ifdef DEBUG
if (network_debugging)
{
fprintf(_p_stdout,"%lu: read header\n", ME);
fprintf(_p_stdout,
"%lu: Got header: fd=%d size=%lu type=%lu node=%lu\n",
(unsigned long) _p_my_id,
ent->fd, (unsigned long) hdr.size,
(unsigned long) hdr.type,
(unsigned long) hdr.node);
}
#endif
#ifdef TCP_BUMP_HACK
if (hdr.alloc_size != -1)
len = hdr.alloc_size;
else
#endif
len = MsgSize2Bytes(hdr.size);
if ((ent->buf = (char *) malloc(len)) == (char *) NULL)
_p_malloc_error();
ent->len = len - HEADER_SIZE * CELL_SIZE;
memcpy((char *) ent->buf, (char *) &hdr,
HEADER_SIZE * CELL_SIZE);
ent->bufend = ent->buf + HEADER_SIZE * CELL_SIZE;
}
if (ent->buf == (char *) NULL)
_p_fatal_error("recv_one_megs: NULL buffer");
while (ent->len > 0)
{
#ifdef DEBUG
if (network_debugging)
printf("%lu: reading %d bytes\n", ME, ent->len);
#endif
n_read = read(ent->fd, ent->bufend, ent->len);
#ifdef DEBUG
if (network_debugging)
printf("%lu: read returns %d\n", ME, n_read);
#endif
if (n_read < 0)
{
if (errno == EINTR)
continue;
else if (errno == EWOULDBLOCK)
{
#ifdef DEBUG
if (network_debugging)
printf("%lu: read would block\n", ME);
#endif
break;
}
else
FATAL_SYSTEM_ERROR("read failed: %s");
}
if (n_read == 0)
break;
else
{
ent->len -= n_read;
ent->bufend += n_read;
}
}
if (ent->len == 0)
{
#ifdef DEBUG
if (network_debugging)
printf("%lu: Got complete message id=%d fd=%d\n",
ME, i, ent->fd);
#endif
avail = TRUE;
}
}
}
if (!blocking)
done = TRUE;
else
done = avail;
}
#ifdef DEBUG
if (network_debugging > 1)
printf("%lu: end of loop: avail=%lu\n",
(unsigned long) ME, (unsigned long) avail);
#endif
if (!avail && blocking)
_p_fatal_error("Blocking recv_one_mesg didn't find a message");
if (!avail)
return FALSE;
i = next_proc;
while (1)
{
ent = &(proc_table[next_proc]);
if (ent->buf != NULL && ent->len == 0)
break;
next_proc = (next_proc + 1) % _p_nodes;
if (next_proc == i)
_p_fatal_error("recv_one_mesg: avail was true, but didn't find a message");
}
*rcv_buf = (msg_buf_header_t *) ent->buf;
#ifdef DEBUG
if (network_debugging)
printf("%lu: Got message avail on entry %ld fd %d len %ld s=%lu t=%lu n=%lu\n",
(unsigned long) ME, (long) (ent - proc_table),
ent->fd, (long) (ent->bufend - ent->buf),
(unsigned long) (**rcv_buf).size,
(unsigned long) (**rcv_buf).type,
(unsigned long) (**rcv_buf).node);
#endif
ent->buf = NULL;
ent->len = 0;
next_proc = (next_proc + 1) % _p_nodes;
return TRUE;
}
bool_t _p_msg_receive(node, size, type, rcv_type)
int *node;
int *size;
int *type;
int rcv_type;
{
bool_t done, done_inner;
msg_buf_header_t *rcv_buf;
bool_t gotit;
bool_t read_from_queue = FALSE;
for (done = FALSE; !done; )
{
if (mq_front != (msg_buf_header_t *) NULL &&
(rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK))
{
read_from_queue = TRUE;
rcv_buf = mq_front;
mq_front = mq_front->next;
}
else
{
switch (rcv_type)
{
case RCV_BLOCK:
case RCV_NOBLOCK:
gotit = recv_one_mesg(&rcv_buf, rcv_type == RCV_BLOCK);
if (!gotit)
{
if (rcv_type == RCV_BLOCK)
_p_fatal_error("_p_msg_receive: blocking read didn't return a message");
else
return (FALSE);
}
break;
case RCV_PARAMS:
case RCV_GAUGE:
for (done_inner = FALSE; !done_inner; )
{
recv_one_mesg(&rcv_buf, TRUE);
if ( (rcv_type==RCV_PARAMS && rcv_buf->type==MSG_PARAMS)
|| (rcv_type==RCV_GAUGE && rcv_buf->type==MSG_GAUGE)
|| rcv_buf->type == MSG_EXIT
|| rcv_buf->type == MSG_INITIATE_EXIT )
{
done_inner = TRUE;
}
else
{
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = rcv_buf;
mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
}
}
break;
case RCV_COLLECT:
for (done_inner = FALSE; !done_inner; )
{
recv_one_mesg(&rcv_buf, TRUE);
switch (rcv_buf->type)
{
case MSG_READ:
case MSG_CANCEL:
case MSG_EXIT:
case MSG_INITIATE_EXIT:
done_inner = TRUE;
break;
case MSG_COLLECT:
FreeMsgBuf(rcv_buf);
break;
default:
/*
* MSG_DEFINE, MSG_VALUE, MSG_CLOSE_STREAM,
* MSG_GAUGE, or a stream message
*/
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = rcv_buf;
mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
break;
}
}
break;
}
}
#ifdef STREAMS
if (rcv_buf->type == MSG_CLOSE_STREAM)
{
handle_close_stream(rcv_buf);
}
else if (rcv_buf->type < LAST_STREAM_MSG_TYPE)
{
handle_stream_receive(rcv_buf);
}
else /* Normal non-stream related message */
{
#endif /* STREAMS */
*size = rcv_buf->size;
*node = rcv_buf->node;
*type = rcv_buf->type;
/*
* Now copy the message onto the heap. If there is
* not enough room for it, then garbage collect.
* Assume that there will always be enough room
* for MSG_CANCEL message.
*/
if (*type != MSG_CANCEL)
{
TryGCWithSize(*size);
}
memcpy(_p_heap_ptr, rcv_buf->user_buf, *size * CELL_SIZE);
FreeMsgBuf(rcv_buf);
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_msg_receive: Received from n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) *node, (unsigned long) *size,
(unsigned long) *type);
fflush(_p_stdout);
}
#endif /* DEBUG */
done = TRUE;
#ifdef STREAMS
}
#endif /* STREAMS */
}
return (TRUE);
}
void _p_msg_send(buf, node, size, type)
cell_t *buf;
int node;
int size;
int type;
{
bool_t gotit;
msg_buf_header_t *send_buf, *rcv_buf;
int_t allocated_size;
int len;
int n_left, n_sent;
char *send_ptr;
#ifdef DEBUG
if (node == _p_my_id)
_p_fatal_error("_p_msg_send(): Help! I can't send to myself!");
#endif /* DEBUG */
if (buf == (cell_t *) NULL)
{
if ((send_buf = (msg_buf_header_t *) malloc(MsgSize2Bytes(0))) == NULL)
_p_malloc_error();
allocated_size = 0;
size = 0;
}
else
{
send_buf = User2MsgBuf(buf);
allocated_size = send_buf->size;
}
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_msg_send: Sending to n %lu, s %lu, t %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node, (unsigned long) size,
(unsigned long) type);
fflush(_p_stdout);
}
#endif /* DEBUG */
send_buf->size = size;
send_buf->type = type;
send_buf->node = _p_my_id;
#ifdef TCP_BUMP_HACK
len = send_buf->alloc_size == -1 ? MsgSize2Bytes(size) :
send_buf->alloc_size;
#else
len = MsgSize2Bytes(size);
#endif
n_left = len;
send_ptr = (char *) send_buf;
while (n_left > 0)
{
while (1)
{
gotit = recv_one_mesg(&rcv_buf, FALSE); /* Nonblocking read */
if (!gotit)
break;
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = rcv_buf;
mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
}
/*
* The HP machine has a problem with writing large blocks of data.
* According to the write(2) manual page, there should be
* no problem with this. But I was getting "Message too long"
* errors.
*
* Supposedly, one can write PIPSIZ (=8192) bytes to a pipe.
* Even that got me the Message too long errors. Hence we
* write in 4K packets.
*/
#ifdef hpux
len = n_left > 4096 ? 4096 : n_left;
#else
len = n_left;
#endif
n_sent = write(proc_table[node].fd, send_ptr, len);
if (n_sent < 0)
{
if (errno == EWOULDBLOCK)
{
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) Send would block\n",
ident());
#endif
continue;
}
else if (errno == EPIPE)
{
_p_fatal_error("_p_msg_send: Got eof on send");
}
else
{
static char buf[100];
sprintf(buf, "_p_send_msg: Write failed: %s",
SYSTEM_ERROR);
_p_fatal_error(buf);
}
}
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) Sent %d/%d bytes to %d\n",
ident(), n_sent, n_left, node);
#endif
n_left -= n_sent;
send_ptr += n_sent;
}
FreeMsgBuf(send_buf);
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout, "(%lu,%lu) _p_msg_send: Sent to %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node);
fflush(_p_stdout);
}
#endif /* DEBUG */
}
void _p_destroy_nodes()
{
destroying_nodes = TRUE;
}
void _p_sr_node_initialized()
{
worker_ctl_msg_t msg;
int i;
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout,
"(%s) Got sr_node_initialized, _p_multiprocessing=%lu\n",
ident(), (unsigned long) _p_multiprocessing);
#endif
#if 1
if (!_p_multiprocessing)
return;
if (_p_host)
{
for (i = 1; i < _p_nodes; i++)
{
msg.type = WORKER_CTL_CHECKIN;
msg.i1 = i;
blocking_write_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
"host checkin send");
blocking_read_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
"_p_sr_node_initialized");
if (msg.type != WORKER_CTL_CHECKIN_ACK)
{
_p_fatal_error("Wanted WORKER_CTL_CHECKIN_ACK in _p_sr_node_initialized");
}
if (msg.i1 != i)
{
_p_fatal_error("_p_sr_node_initialized: msg.i1 != i");
}
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) Node %d checked in\n", ident(), i);
#endif
}
}
else
{
blocking_read_errchk(proc_table[_p_host_id].fd, (char *) &msg,sizeof(msg),
"_p_sr_node_initialized");
if (msg.type != WORKER_CTL_CHECKIN)
{
_p_fatal_error("host sent wrong message type on checkin");
}
if (msg.i1 != _p_my_id)
{
_p_fatal_error("_p_sr_node_initialized: msg.i1 != my_id");
}
msg.type = WORKER_CTL_CHECKIN_ACK;
msg.i1 = _p_my_id;
blocking_write_errchk(proc_table[_p_host_id].fd, (char *) &msg,
sizeof(msg),
"_p_sr_node_initialized");
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) Worker %lu done checking in\n",
ident(), (unsigned long) _p_my_id);
#endif
}
#endif
}
/*
* int fork_nodes(int n)
*
* Fork off n worker nodes.
*/
static int fork_nodes(n)
int n;
{
char host_name[128];
int pid = 1;
if (gethostname(host_name, sizeof(host_name)) < 0)
{
sr_fatal_error("gethostname failed");
}
for (; n > 0; n--)
{
#ifdef DEBUG
if (ParDebug(9))
printf("Forking off a worker node\n");
#endif /* DEBUG */
if ((pid = fork()) < 0) /* error */
{
sr_fatal_error("Failed fork()");
}
else if (pid == 0) /* child */
{
_p_host = FALSE;
if (parallel_info[0] == '\0')
sprintf(parallel_info, "%s,%d", host_name, my_port);
close(listen_fd);
break;
}
}
return (pid);
} /* fork_nodes() */
static void setup_host_port()
{
net_setup_anon_listener(50, &my_port, &listen_fd);
}
void _p_sr_init_node(argc, argv)
int argc;
char *argv[];
{
int i;
/* Initialize the proc table with all fd=-1 */
_p_stdout = stdout;
for (i = 0; i < MAX_NODES; i++)
{
proc_table[i].buf = NULL;
proc_table[i].len = 0;
proc_table[i].fd = -1;
}
/*
* _p_nodes will have a default value of -1. Therefore, all of the
* tests below should allow a negative value for _p_nodes, which
* indicates that _p_nodes was not set on the command line with -n.
*/
if(_p_nodes > MAX_NODES)
{
char buf[256];
sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
sr_fatal_error(buf);
}
if (_p_nodes <= 1 && !use_start_nodes
&& host_startup_list[0] == '\0'
&& parallel_startup[0] == '\0' && parallel_info[0] == '\0')
{
/*
* Do a uniprocessing run and return
*/
_p_my_id = _p_host_id = 0;
_p_nodes = 1;
_p_host = TRUE;
/* Finish node initialization and run emulator */
_p_init_node(argc, argv);
return;
}
signal(SIGPIPE, SIG_IGN);
if (parallel_info[0] != '\0')
{
/*
* This is a node. The -pi flag was used to tell this node
* to start up _p_nodes nodes on this machine.
*/
_p_host = FALSE;
fork_nodes(_p_nodes - 1);
}
else if (host_startup_list[0] != '\0')
{
/*
* This is the host. The -nodes flag was used to specify
* a list of the machines on which to start nodes.
*/
_p_host = TRUE;
setup_host_port();
_p_nodes = 1;
start_nodes_from_list(host_startup_list);
}
else if (parallel_startup[0] != '\0')
{
/*
* This is the host. The -s flag was used to specify
* a startup file.
*/
_p_host = TRUE;
setup_host_port();
_p_nodes = 1;
read_startup_file(parallel_startup);
}
else if (use_start_nodes)
{
/*
* This is the host. The -start-nodes flag was used to
* specify that it should start up the nodes
* with the host-control stuff. Use the value of _p_nodes (which
* is set with the -n flag) to control how many nodes will
* be created.
*/
char host_name[256];
int num;
if (gethostname(host_name, sizeof(host_name)) < 0)
{
sr_fatal_error("gethostname failed");
}
_p_host = TRUE;
setup_host_port();
num = _p_nodes - 1;
_p_nodes = 1; /* Since start_nodes increments _p_nodes */
start_nodes(num, host_name);
}
else if (_p_nodes > 1)
{
/*
* This is the host. Only the -n flag was used, so it
* should fork off _p_nodes-1 workers.
*/
_p_host = TRUE;
setup_host_port();
fork_nodes(_p_nodes - 1);
}
else
{
sr_fatal_error("Illegal argument combination");
}
if (_p_host)
{
start_host();
}
else
{
start_worker();
}
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout, "All right! just before init, proc table is...\n");
dump_proc_table();
}
#endif
/* OPTIMAL_TCP_BLOCKSIZE is the magic packet size for TCP */
_p_default_msg_buffer_size = OPTIMAL_TCP_BLOCKSIZE / CELL_SIZE - HEADER_SIZE;
mq_front = mq_back = (msg_buf_header_t *) NULL;
_p_init_node(argc, argv);
}
static void read_startup_file(file)
char *file;
{
FILE *fp;
char buf1[MAX_PATH_LENGTH], *b1;
char host_name[256];
int i, j;
/* Get the host's host name */
if (gethostname(host_name, sizeof(host_name)) == -1)
sr_fatal_error("Failed to get host's machine name");
if ((fp = fopen(file, "r")) == (FILE *) NULL)
sr_fatal_error("Unable to open parallel startup file");
/* Read in parallel startup file, one line at a time */
for (i = 1; fgets(buf1, MAX_PATH_LENGTH - 1, fp) != (char *) NULL; i++)
{
if (buf1[0] == '%' || buf1[0] == '\n' || buf1[0] == ' '
|| buf1[0] == '\t' || buf1[0] == '#')
{
continue;
}
else if (strncmp(buf1, "fork", 4) == 0)
{
/* Fork off the specified number of nodes */
if (sscanf(buf1+4, "%d", &j) != 1 || j < 1)
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
_p_nodes += j;
if (fork_nodes(j) == 0) /* child */
break;
}
else if (strncmp(buf1, "start-nodes", 11) == 0)
/* Use the start-nodes script to connect to the
* node-control daemons */
{
int to_start;
b1 = buf1 + 11;
to_start = atoi(b1);
if (to_start == 0)
{
sr_fatal_error("Invalid start-nodes");
}
start_nodes(to_start, host_name);
}
else if (strncmp(buf1, "exec", 4) == 0) /* use system() */
{
j = strlen(buf1) - 1;
if (buf1[j] == '\n')
buf1[j] = '\0';
/* Get the number of nodes to fork */
if (sscanf(buf1+4, "%d", &j) != 1 || j < 1)
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
/* Find the start of the command */
if ((b1 = strchr(buf1, ':')) == (char *) NULL)
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
b1++;
start_child(j, b1,host_name);
}
else
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
}
fclose(fp);
}
static void start_nodes_from_list(hosts)
char *hosts;
{
char *hptr, *s, *host;
int done;
char pwd[MAX_PATH_LENGTH];
char host_name[256];
/* Get the host's host name */
if (gethostname(host_name, sizeof(host_name)) == -1)
sr_fatal_error("Failed to get host's machine name");
if (getwd(pwd) == 0)
sr_fatal_error_1("getwd failed: %s\n", pwd);
for (hptr = hosts, done = 0; !done; )
{
host = hptr;
s = strchr(hptr, ':');
if (s)
{
hptr = s + 1;
*s = 0;
}
else
done = 1;
start_node_on_host(host, exe_from_argv, pwd, host_name);
}
}
static void start_node_on_host(host, prog, pwd, myhost)
char *host, *prog, *pwd, *myhost;
{
char cmd[1024];
#ifdef DEBUG
if (network_debugging)
{
printf("Starting node on \"%s\" prog=\"%s\" pwd=\"%s\"\n",
host, prog, pwd);
}
#endif
sprintf(cmd, "rsh %s -n 'cd %s; %s -pcn -n 1 -pi %s,%d' &",
host, pwd, prog, myhost, my_port);
#ifdef DEBUG
if (network_debugging)
{
printf("Command is <%s>\n", cmd);
}
#endif
if (dont_start_remotes)
{
printf("Would have run command \"%s\"\n", cmd);
}
else if (system(cmd) != 0)
{
sr_fatal_error("Failed system()");
}
_p_nodes++;
}
static void start_nodes(num, myhost)
int num;
char *myhost;
{
char argbuf[512];
char host_daemon_host[512];
int host_daemon_port;
int nstarted;
/* Construct the $ARGS$ argument */
sprintf(argbuf, " -pcn -n 1 -pi %s,%d",
myhost, my_port);
find_host_daemon(host_daemon_host, &host_daemon_port);
if (host_daemon_host[0] == '\0')
{
nstarted = start_nodes_with_script(num, argbuf);
}
else
{
/* Ok. A host daemon is running. Use it to start the nodes */
nstarted = start_nodes_with_daemon(host_daemon_host, host_daemon_port,
num, argbuf);
#ifdef DEBUG
if (network_debugging)
printf("started %d nodes with daemon\n", nstarted);
#endif
}
_p_nodes += nstarted;
}
static int start_nodes_with_script(num, args)
int num;
char *args;
{
char cmdbuf[1024];
sprintf(cmdbuf, "start-nodes %d %s", num, args);
#ifdef DEBUG
if (network_debugging)
printf("cmdbuf: '%s'\n", cmdbuf);
#endif
if (dont_start_remotes)
{
printf("Would have run <%s>\n", cmdbuf);
}
else if (system(cmdbuf) != 0)
{
sr_fatal_error("failed system()");
}
/* Here we don't know how many were started, so just assume it worked */
return num;
}
static int start_nodes_with_daemon(host, port, nnodes, args)
char *host, *args;
int port, nnodes;
{
int sock, n;
char buf[1000];
char *bptr;
char pwd[MAX_PATH_LENGTH+1];
char hcfile[MAX_PATH_LENGTH + 1];
if (getwd(pwd) == 0)
sr_fatal_error_1("getwd failed: %s\n", pwd);
#ifdef DEBUG
if (network_debugging)
printf("connecting to %s/%d\n", host, port);
#endif
sock = net_conn_to_listener(host, port);
if (sock < 0)
{
sr_fatal_error_1("connect to host daemon failed: %s", SYSTEM_ERROR);
return 0;
}
read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
if (network_debugging)
printf("got buf '%s'\n", buf);
#endif
sprintf(buf, "PCN startup arch=%s het=%lu listen_port=%d\n",
PCN_ARCH, (unsigned long) start_heterogeneous, my_port);
#ifdef DEBUG
if (network_debugging)
printf("sending buf '%s'\n", buf);
#endif
write_line_to_fd(sock, buf);
/*
* Read the ack
*/
read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
if (network_debugging)
printf("Got ack '%s'\n", buf);
#endif
if (hostconfig_file[0] != '\0')
{
if (hostconfig_file[0] != '/')
sprintf(hcfile, "%s/%s", pwd, hostconfig_file);
else
strcpy(hcfile, hostconfig_file);
if (access(hcfile, R_OK) != 0)
{
printf("hostconfig file %s not accessible\n", hcfile);
hcfile[0] = '\0';
}
else
{
sprintf(buf, "cmd: load-config %s\n", hcfile);
write(sock, buf, strlen(buf));
while (1)
{
read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
if (network_debugging)
printf("Got line '%s'\n", buf);
#endif
if (buf[0] == '.' && buf[1] == '\0')
break;
}
}
}
strcpy(buf, "cmd: enabled-hosts\n");
if (write(sock, buf, strlen(buf)) < 0)
{
perror("write");
exit(1);
}
read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
if (network_debugging)
printf("got buf '%s'\n", buf);
#endif
bptr = strchr(buf, ':');
bptr++;
n = atoi(bptr);
#ifdef DEBUG
if (network_debugging)
printf("got %d nodes enabled\n", n);
#endif
if (n == 0)
{
printf("No nodes enabled in host-control\n");
close(sock);
return n;
}
/* Eat "." line */
read_line_from_fd(sock, buf, sizeof(buf));
/* If we've got a hostconfig file, then just execute a
* startnodes command on the host.
* Otherwise, do a startnodes_withdir using our current directory
* as the directory and argv[0] as the pcn pathname
*/
if (hcfile[0] == '\0')
{
sprintf(buf, "cmd: startnodes_withdir %s %s %d %s\n",
pwd, exe_from_argv, nnodes, args);
}
else
{
sprintf(buf, "cmd: startnodes %d %s\n", nnodes, args);
}
#ifdef DEBUG
if (network_debugging)
printf("starting nodes with '%s'\n", buf);
#endif
write(sock, buf, strlen(buf));
read_line_from_fd(sock, buf, sizeof(buf));
#ifdef DEBUG
if (network_debugging)
printf("got line '%s'\n", buf);
#endif
bptr = strchr(buf, ':');
if (bptr == NULL)
{
close(sock);
sr_fatal_error("Startup with host-control failed\n");
}
bptr++;
while (isspace(*bptr))
bptr++;
n = atoi(bptr);
#ifdef DEBUG
if (network_debugging)
printf("Started %d nodes\n", n);
#endif
close(sock);
return n;
}
static void write_line_to_fd(sock, str)
int sock;
char *str;
{
int len = strlen(str);
if (write(sock, str, len) < 0)
{
perror("write");
}
}
static void read_line_from_fd(sock, buf, len)
int sock, len;
char *buf;
{
char *bptr;
int n;
bptr = buf;
while (1)
{
n = read(sock, bptr, 1);
if (n < 0)
{
perror("read");
exit(1);
}
if (*bptr == '\n')
break;
bptr++;
if (bptr - buf >= len)
break;
}
*bptr = 0;
}
static void find_host_daemon(host, port)
char *host;
int *port;
{
FILE *fp;
char *home, *dir;
char file[MAX_PATH_LENGTH+1];
char hostfile[MAX_PATH_LENGTH+1];
char buf[100];
DIR *dirp;
#ifdef USE_DIRENT
struct dirent *ent;
#else
#if defined(USE_DIRECT)
struct direct *ent;
#endif
#endif
hostfile[0] = '\0';
dir = getenv("PCN_CONTROL_DIR");
if (dir == NULL)
{
home = getenv("HOME");
if ((dir = (char *) malloc(MAX_PATH_LENGTH+1)) == (char *) NULL)
_p_malloc_error();
sprintf(dir, "%s/.pcn_control/hosts", home);
#ifdef DEBUG
if (network_debugging)
printf("got dir '%s'\n", dir);
#endif
}
#if defined(USE_DIRECT)
dirp = opendir(dir);
if (dirp == NULL)
{
#ifdef DEBUG
if (network_debugging)
printf("couldn't open dir %s\n", dir);
#endif
*host = 0;
return;
}
while ((ent = readdir(dirp)) != NULL)
{
if (ent->d_name[0] != '.')
break;
}
closedir(dirp);
if (ent)
{
strncpy(hostfile, ent->d_name, ent->d_namlen);
hostfile[ent->d_namlen] = '\0';
}
#else
#if defined(USE_DIRENT)
dirp = opendir(dir);
if (dirp == NULL)
{
#ifdef DEBUG
if (network_debugging)
printf("couldn't open dir %s\n", dir);
#endif
*host = 0;
return;
}
while ((ent = readdir(dirp)) != NULL)
{
if (ent->d_name[0] != '.')
break;
}
closedir(dirp);
if (ent)
strcpy(hostfile, ent->d_name);
#endif
#endif
if (hostfile[0] == '\0')
{
#ifdef DEBUG
printf("didn't find a host file\n");
#endif
*host = '\0';
return;
}
sprintf(file, "%s/%s", dir, hostfile);
if ((fp = fopen(file, "r")) == NULL)
{
#ifdef DEBUG
if (network_debugging)
printf("couldn't open file '%s': %s\n", file, sys_errlist[errno]);
#endif
*host = 0;
return;
}
if (fgets(buf, sizeof(buf), fp) == NULL)
{
#ifdef DEBUG
if (network_debugging)
printf("couldn't get a line from file\n");
#endif
*host = 0;
return;
}
fclose(fp);
*port = atoi(buf);
if (*port == 0)
{
#ifdef DEBUG
if (network_debugging)
printf("invalid line '%s'\n", buf);
#endif
*host = 0;
return;
}
strcpy(host, ent->d_name);
}
static void start_child(num, rawcmd, host_name)
int num;
char *rawcmd;
char *host_name;
{
char argbuf[512], cmdbuf[1024], buf1[512];
char *rawcmdptr, *cmdptr;
bool_t found;
/* Construct the $ARGS$ argument */
sprintf(argbuf, " -pcn -n %d -pi %s,%d",
num, host_name, my_port);
cmdptr = cmdbuf;
/* Substitute $ARGS$ in the command with appropriate stuff */
for (rawcmdptr = rawcmd, cmdptr = cmdbuf, found = FALSE;
*rawcmdptr != '\0'; )
{
if (strncmp(rawcmdptr, "$ARGS$", 6) == 0)
{
*cmdptr = '\0';
strcat(cmdbuf, argbuf);
strcat(cmdbuf, rawcmdptr + 6);
found = TRUE;
break;
}
else
*cmdptr++ = *rawcmdptr++;
}
if (!found)
{
sprintf(buf1, "Bad startup command -- no $ARGS$");
sr_fatal_error(buf1);
}
strcat(cmdbuf, " &");
_p_nodes += num;
/* Now system() it off */
#ifdef DEBUG
if (ParDebug(9))
fprintf(_p_stdout, "Execing worker with: %s\n", cmdbuf);
#endif
if (dont_start_remotes)
{
printf("Would have run command \"%s\"\n", cmdbuf);
}
else if (system(cmdbuf) != 0)
{
sr_fatal_error("Failed system()");
}
}
/*
* void sr_fatal_error(char *msg)
*
* Used by _p_sr_init_node() to deal with fatal errors during the
* worker creation process.
*/
static void sr_fatal_error(msg)
char *msg;
{
char *syserr = errno == 0 ? "" : sys_errlist[errno];
if (_p_my_id == -1) /* Node doesn't have an id yet */
{
fprintf(stderr,
"Fatal error: Node ???: Failed to create workers: %s (%s)\n",
msg, syserr);
}
else
{
fprintf(stderr,
"Fatal error: Node %lu: Failed to create workers: %s (%s)\n",
(unsigned long) _p_my_id, msg, syserr);
}
exit(1);
} /* sr_fatal_error() */
static void sr_fatal_error_1(msg, arg)
char *msg;
char *arg;
{
char buf[1024];
sprintf(buf, msg, arg);
sr_fatal_error(buf);
}
/**************************************************************/
/*
* Start up the host node.
*
* Create a port (my_port) and socket (listen_fd) to listen for
* children's connections.
*
* Select on listen_fd and on any child connection.
*
* When each children connects, send message to each telling it
* its node number and the ids of each of the other children.
*
*/
static void start_host()
{
int i, rc;
worker_ctl_msg_t msg;
char buf[300];
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host accepting connections\n", ident());
#endif
accept_connections();
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host accepted connections\n", ident());
#endif
#ifdef DEBUG
if (network_debugging)
dump_proc_table();
#endif
_p_host_id =_p_my_id = 0;
for (i = 1; i < _p_nodes; i++)
{
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host sending proc table to %d\n", ident(),
i);
#endif
send_proc_table(i);
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host sent proc table to %d\n", ident(),
i);
#endif
}
/* Tell everyone to connect.
*
* We even tell node _p_nodes-1 to connect even though he doesn't have
* anyone to connect to since it makes the code simpler. It provides
* another place to check with the node to ensure it is still there,
* though.
*/
for (i = 1; i < _p_nodes; i++)
{
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host telling %d to start connecting\n",
ident(), i);
#endif
msg.type = WORKER_CTL_START_CONNECTING;
blocking_write_sr_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
"start_host");
if ((rc = blocking_read(proc_table[i].fd, (char *) &msg, sizeof(msg))) < 0)
{
sprintf(buf, "start_host: read failed from node %d: %s", i,
SYSTEM_ERROR);
sr_fatal_error(buf);
}
else if (rc == 0)
{
sprintf(buf, "start_host: read failed from node %d: got eof", i);
sr_fatal_error(buf);
}
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host found connections from %d done\n",
ident(), i);
#endif
if (msg.type != WORKER_CTL_CONNECTIONS_MADE)
{
sprintf(buf, "start_host: wanted type %d, got %d\n",
WORKER_CTL_CONNECTIONS_MADE, msg.type);
sr_fatal_error(buf);
}
}
/* Let's go! */
for (i = 1; i < _p_nodes; i++)
{
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host tells worker %d go\n", ident(), i);
#endif
msg.type = WORKER_CTL_GO;
blocking_write_sr_errchk(proc_table[i].fd, (char *) &msg, sizeof(msg),
"start_host");
if ((rc = blocking_read(proc_table[i].fd, (char *) &msg, sizeof(msg))) < 0)
{
sprintf(buf, "start_host: read failed for go ack on node %d: %s",
i, SYSTEM_ERROR);
sr_fatal_error(buf);
}
else if (rc == 0)
{
sprintf(buf, "start_host: read failed for go ack on node %d: eof",
i);
sr_fatal_error(buf);
}
if (msg.type != WORKER_CTL_GO_ACK)
{
sprintf(buf, "start_host: didn't get ack, got %ld instead",
(long) msg.type);
sr_fatal_error(buf);
}
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host got GO_ACK from %d\n", ident(), i);
#endif
}
}
static void accept_connections()
{
fd_set read_fds;
int n, fd, n_connected;
worker_ctl_msg_t msg;
/* Loop until all of the workers have connected back with the host */
for (n_connected = 1;
n_connected < _p_nodes;
/* n_connected is incremented in process_connection() */ )
{
FD_ZERO(&read_fds);
set_mask_for_all_workers(&read_fds);
FD_SET(listen_fd, &read_fds);
n = select(NUM_FDS, &read_fds, (fd_set *) NULL,
(fd_set *) NULL, (struct timeval *) NULL);
if (n == 0)
continue;
if (n < 0)
{
char buf[300];
if (errno == EINTR)
continue;
sprintf(buf, "Fatal error in select: %s\n", sys_errlist[errno]);
sr_fatal_error(buf);
}
for (fd = 0; fd < NUM_FDS && n > 0; fd++)
{
if (FD_ISSET(fd, &read_fds))
{
n--;
if (fd == listen_fd)
process_connection(&n_connected);
else
{
blocking_read_sr_errchk(fd, (char *) &msg, sizeof(msg),
"accept_connections");
sr_fatal_error("accept_connections: Didn't expect message from other worker");
}
}
}
}
}
static void send_proc_table(node)
int node;
{
int i, fd;
worker_ctl_msg_t msg;
fd = proc_table[node].fd;
/* Don't need to send entries for node or host */
for (i = 1; i < _p_nodes; i++)
{
if (i == node)
continue;
msg.type = WORKER_CTL_PROCTBL_ENTRY;
msg.i1 = i;
msg.i2 = proc_table[i].port;
strcpy(msg.buf, proc_table[i].host);
blocking_write_sr_errchk(fd, (char *) &msg, sizeof(msg), "send_proc_table");
}
msg.type = WORKER_CTL_PROCTBL_END;
blocking_write_sr_errchk(fd, (char *) &msg, sizeof(msg), "send_proc_table");
}
/*
* Process a new connection on the host's port,
* incrementing *n_conn with success
*/
static void process_connection(n_conn)
int *n_conn;
{
int worker_fd;
worker_ctl_msg_t msg;
worker_fd = net_accept(listen_fd);
if (worker_fd < 0)
{
sr_fatal_error("process_connection: accept failed");
}
set_nonblocking(worker_fd);
proc_table[*n_conn].fd = worker_fd;
msg.type = WORKER_CTL_INIT;
msg.i1 = *n_conn;
msg.i2 = _p_nodes;
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) host sends CTL_INIT to %d\n", ident(),
*n_conn);
#endif
blocking_write_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
"process_connection");
blocking_read_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
"process_connection");
if (msg.type == WORKER_CTL_EXEC_FAILED)
{
char buf[300];
errno = 0;
sprintf(buf,
"host-control says worker exec failed: %s",
msg.buf);
sr_fatal_error(buf);
}
else if (msg.type != WORKER_CTL_PORT)
{
char buf[300];
sprintf(buf, "incorrect type in receive: %ld, wanted %d",
(long) msg.type, WORKER_CTL_PORT);
sr_fatal_error(buf);
}
#ifdef DEBUG
if (network_debugging)
printf("Host got connection for node %d host %s port %d\n",
*n_conn, msg.buf, msg.i1);
#endif
proc_table[*n_conn].port = msg.i1;
strcpy(proc_table[*n_conn].host, msg.buf);
(*n_conn)++;
}
/*
* The worker connects back to the host's port, determined by parsing
* the argument passed in parallel_info
*
*/
static void start_worker()
{
char *s, host[256];
int port;
int host_fd;
worker_ctl_msg_t msg;
if (parallel_info[0] == '\0')
sr_fatal_error("Worker doesn't know his host");
if ((s = strchr(parallel_info, ',')) == (char *) NULL)
{
sr_fatal_error("Failed to parse -pi argument");
}
*s = 0;
strcpy(host, parallel_info);
port = atoi(s + 1);
host_fd = net_conn_to_listener(host, port);
if (host_fd < 0)
{
sr_fatal_error("Error connecting to host");
}
set_nonblocking(host_fd);
blocking_read_sr_errchk(host_fd, (char *) &msg, sizeof(msg), "start_worker");
if (msg.type != WORKER_CTL_INIT)
{
sr_fatal_error("Incorrect message type from host");
}
_p_my_id = msg.i1;
_p_nodes = msg.i2;
_p_host_id = 0;
proc_table[_p_host_id].fd = host_fd;
proc_table[_p_host_id].port = port;
strcpy(proc_table[_p_host_id].host, host);
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) worker got CTL_INIT _p_nodes=%lu \n", ident(),
(unsigned long) _p_nodes);
#endif
net_setup_anon_listener(3, &my_port, &listen_fd);
if (gethostname(msg.buf, sizeof(msg.buf)) == -1)
{
sr_fatal_error("gethostname failed");
}
msg.type = WORKER_CTL_PORT;
msg.i1 = my_port;
blocking_write_sr_errchk(host_fd, (char *) &msg, sizeof(msg), "start_worker");
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) worker send port %d\n", ident(), my_port);
#endif
proc_table[_p_my_id].fd = listen_fd;
proc_table[_p_my_id].port = my_port;
strcpy(proc_table[_p_my_id].host, msg.buf);
receive_proc_table();
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout, "Worker %lu proc table:\n",
(unsigned long) _p_my_id);
dump_proc_table();
}
#endif
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) worker awaiting connections \n", ident());
#endif
await_connections();
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) worker making connections \n", ident());
#endif
make_connections();
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) worker awaiting go \n", ident());
#endif
await_go();
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) worker got go \n", ident());
#endif
}
/* Wait for a go message from the host */
static void await_go()
{
worker_ctl_msg_t msg;
blocking_read_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
"await_go");
if (msg.type != WORKER_CTL_GO)
{
sr_fatal_error("await_go: didn't get go");
}
msg.type = WORKER_CTL_GO_ACK;
blocking_write_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
"await_go");
}
static void receive_proc_table()
{
bool_t done;
worker_ctl_msg_t msg;
int host_fd;
/* Just hang in a receive from the host. No one else should be
sending to us yet, so they can just wait. */
host_fd = proc_table[_p_host_id].fd;
for (done = FALSE; !done; )
{
blocking_read_sr_errchk(host_fd, (char *) &msg, sizeof(msg),
"receive_proc_table");
switch (msg.type)
{
case WORKER_CTL_PROCTBL_ENTRY:
proc_table[msg.i1].port = msg.i2;
strcpy(proc_table[msg.i1].host, msg.buf);
break;
case WORKER_CTL_PROCTBL_END:
done = TRUE;
break;
default:
sr_fatal_error("Invalid type in receive_proc_table");
break;
}
}
}
/* Wait for
* - other nodes to connect to me
* - host to tell me to make connections
* (in which case I just return and my caller calls make_connections)
*/
static void await_connections()
{
fd_set read_fds;
bool_t done;
int n, fd, node;
for (done = FALSE; !done; )
{
FD_ZERO(&read_fds);
set_mask_for_all_workers(&read_fds);
FD_SET(listen_fd, &read_fds);
FD_SET(proc_table[_p_host_id].fd, &read_fds);
n = select(NUM_FDS, &read_fds, (fd_set *) NULL, (fd_set *) NULL,
(struct timeval *) NULL);
if (n == 0)
continue;
if (n < 0)
{
char buf[300];
if (errno == EINTR)
continue;
sprintf(buf, "Fatal error in select: %s\n", sys_errlist[errno]);
sr_fatal_error(buf);
}
for (fd = 0; fd < NUM_FDS && n > 0; fd++)
{
if (FD_ISSET(fd, &read_fds))
{
n--;
if (fd == listen_fd)
process_connection_from_worker();
else
{
node = node_from_fd(fd);
if (node == -1)
{
sr_fatal_error_1("await_connections: can't find node for fd %d\n", fd);
}
if (node == _p_host_id)
done = process_mesg_from_host();
else
process_mesg_from_node(node);
}
}
}
}
}
static void process_connection_from_worker()
{
int worker_fd;
worker_ctl_msg_t msg;
worker_fd = net_accept(listen_fd);
if (worker_fd < 0)
sr_fatal_error("process_connection_from_worker: accept failed");
set_nonblocking(worker_fd);
msg.type = WORKER_CTL_CONN_TEST;
msg.i1 = _p_my_id;
blocking_write_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
"process_connection_from_worker");
blocking_read_sr_errchk(worker_fd, (char *) &msg, sizeof(msg),
"process_connection_from_worker");
if (msg.type != WORKER_CTL_CONN_ACK)
sr_fatal_error("process_connection_from_worker: wrong response from worker");
if (msg.i2 != _p_my_id)
{
char buf[300];
sprintf(buf, "process_connection_from_worker: node mismatch: me=%lu i1=%d i2=%d\n",
(unsigned long) _p_my_id, msg.i1, msg.i2);
sr_fatal_error(buf);
}
proc_table[msg.i1].fd = worker_fd;
}
/* Called from await_connections when host sends message.
*
* Return TRUE if await_connections should return.
*/
static bool_t process_mesg_from_host()
{
bool_t rc = FALSE;
worker_ctl_msg_t msg;
char buf[300];
blocking_read_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
"process_mesg_from_host");
switch(msg.type)
{
case WORKER_CTL_START_CONNECTING:
rc = TRUE;
break;
default:
sprintf(buf,
"process_mesg_from_host: don't know how to handle message type %ld",
(long) msg.type);
sr_fatal_error(buf);
break;
}
return rc;
}
static void process_mesg_from_node(node)
int node;
{
char buf[300];
int fd;
worker_ctl_msg_t msg;
fd = proc_table[node].fd;
blocking_read_sr_errchk(fd, (char *) &msg, sizeof(msg), "process_mesg_from_node");
switch (msg.type)
{
default:
sprintf(buf,
"process_mesg_from_node: don't know how to handle message type %ld",
(long) msg.type);
sr_fatal_error(buf);
break;
}
}
static void make_connections()
{
int node;
int conn_fd;
worker_ctl_msg_t msg;
char buf[300];
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) making connections\n", ident());
#endif
for (node = _p_my_id + 1; node < _p_nodes; node++)
{
conn_fd = net_conn_to_listener(proc_table[node].host,
proc_table[node].port);
if (conn_fd < 0)
sr_fatal_error("make_connections: connect failed");
set_nonblocking(conn_fd);
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) connected to fd %d\n", ident(), conn_fd);
#endif
blocking_read_errchk(conn_fd, (char *) &msg, sizeof(msg), "make_connections");
if (msg.type != WORKER_CTL_CONN_TEST)
{
sprintf(buf, "make_connections: received mesg type %d, wanted %d",
msg.type, WORKER_CTL_CONN_TEST);
sr_fatal_error(buf);
}
if (node != msg.i1)
{
sprintf(buf, "make_connections: node mismatch: node=%d msg.i1=%d",
node, msg.i1);
sr_fatal_error(buf);
}
msg.type = WORKER_CTL_CONN_ACK;
msg.i1 = _p_my_id;
msg.i2 = node;
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) got node %d\n", ident(), node);
#endif
blocking_write_errchk(conn_fd, (char *) &msg, sizeof(msg), "make_connections");
proc_table[node].fd = conn_fd;
}
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) done making connections, sending done\n", ident());
#endif
msg.type = WORKER_CTL_CONNECTIONS_MADE;
blocking_write_sr_errchk(proc_table[_p_host_id].fd, (char *) &msg, sizeof(msg),
"make_connections");
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) done making connections, sent\n", ident());
#endif
}
static void set_mask_for_all_workers(mask)
fd_set *mask;
{
proc_table_entry *ent;
int i, fd;
if (destroying_nodes)
return;
for (i = 1; i < _p_nodes; i++)
{
ent = &(proc_table[i]);
fd = ent->fd;
if (fd != -1 && i != _p_my_id &&
!(ent->buf != NULL && ent->len == 0))
{
FD_SET(fd, mask);
}
}
}
#ifdef DEBUG
static void dump_proc_table()
{
int_t i;
proc_table_entry *ent;
printf("%lu: Proc table: _p_nodes=%lu\n", (unsigned long) ME,
(unsigned long) _p_nodes);
for (i = 0; i < _p_nodes; i++)
{
ent = &proc_table[i];
printf("%lu: entry %lu: port=%d fd=%d host=%s buf=%lx bufend=%lx len=%d\n",
(unsigned long) ME, (unsigned long) i,
ent->port, ent->fd, ent->host,
(unsigned long) ent->buf, (unsigned long) ent->bufend,
ent->len);
}
}
#endif /* DEBUG */
static int node_from_fd(fd)
int fd;
{
int i;
for (i = 0;i < _p_nodes; i++)
if (proc_table[i].fd == fd)
return i;
#ifdef DEBUG
/*
fprintf(_p_stdout, "(%s) node_from_fd(%d) failed\n", ident(), fd);
dump_proc_table();
*/
#endif
return -1;
}
static void set_nonblocking(fd)
int fd;
{
#ifndef hpux
int flags;
if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
{
perror("fcntl F_GETFL 1");
exit(1);
}
flags |= FNDELAY;
if (fcntl(fd, F_SETFL, flags) < 0)
{
perror("fcntl F_SETFL");
exit(1);
}
#else
int one = 1;
if (ioctl(fd, FIONBIO, &one) < 0)
{
perror("ioctl FIONBIO");
exit(1);
}
#endif
#ifdef F_SETFD
if (fcntl(fd, F_SETFD, 1) < 0)
{
perror("fcntl F_SETFD");
exit(1);
}
#endif
}
struct FourInts
{
int i1, i2, i3, i4, i5;
};
static int blocking_read(fd, buf, size)
int fd;
char *buf;
int size;
{
int n, n_left;
char *msgptr;
for (n_left = size, msgptr = buf; n_left > 0; )
{
n = read(fd, msgptr, n_left);
if (n == 0)
{
#ifdef DEBUG
if (network_debugging)
printf("(%s) read from %d got eof\n", ident(),
node_from_fd(fd));
#endif
return 0;
}
if (n < 0)
{
#ifdef DEBUGFOO
if (network_debugging)
fprintf(_p_stdout, "(%s) read from fd %d node %d returned error: %s\n",
ident(), fd, node_from_fd(fd), SYSTEM_ERROR);
#endif
if (errno == EWOULDBLOCK)
continue;
else if (errno == EPIPE)
return 0;
else
return n;
}
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) read %d/%d bytes from node %d\n",
ident(), n, n_left, node_from_fd(fd));
#endif
n_left -= n;
msgptr += n;
}
#ifdef DEBUG
{
struct FourInts *fi = (struct FourInts *) buf;
if (network_debugging)
fprintf(_p_stdout, "(%s) read got header %d %d %d %d %d\n",
ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5);
}
#endif
return size;
}
static int blocking_write(fd, buf, size)
int fd;
char *buf;
int size;
{
int n, n_left;
char *msgptr;
#ifdef DEBUG
{
struct FourInts *fi = (struct FourInts *) buf;
if (network_debugging)
fprintf(_p_stdout, "(%s) write putting header %d %d %d %d %d\n",
ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5);
}
#endif
for (n_left = size, msgptr = buf; n_left > 0; )
{
n = write(fd, msgptr, n_left);
/*
if (n == 0)
{
printf("write got eof\n");
return 0;
}
*/
if (n < 0)
{
if (errno == EWOULDBLOCK)
continue;
else if (errno == EPIPE)
return 0;
else
return n;
}
#ifdef DEBUG
if (network_debugging)
fprintf(_p_stdout, "(%s) wrote %d/%d bytes to node %d\n",
ident(), n, n_left, node_from_fd(fd));
#endif
n_left -= n;
msgptr += n;
}
#ifdef DEBUG
{
struct FourInts *fi = (struct FourInts *) buf;
if (network_debugging)
fprintf(_p_stdout, "(%s) write got header %d %d %d %d %d\n",
ident(), fi->i1, fi->i2, fi->i3, fi->i4, fi->i5);
}
#endif
return size;
}
static void blocking_write_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
int rc = blocking_write(fd, buf, size);
static char msgbuf[1024];
if (rc < 0)
{
sprintf(msgbuf, "%s: write failed: %s", func, SYSTEM_ERROR);
_p_fatal_error(msgbuf);
}
else if (rc == 0)
{
sprintf(msgbuf, "%s: eof on write\n", func);
_p_fatal_error(msgbuf);
}
}
static void blocking_read_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
int rc = blocking_read(fd, buf, size);
static char msgbuf[1024];
if (rc < 0)
{
sprintf(msgbuf, "%s: read failed: %s", func, SYSTEM_ERROR);
_p_fatal_error(msgbuf);
}
else if (rc == 0)
{
sprintf(msgbuf, "%s: eof on read -- some other node probably aborted\n", func);
_p_fatal_error(msgbuf);
}
}
static void blocking_write_sr_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
int rc = blocking_write(fd, buf, size);
static char msgbuf[1024];
if (rc < 0)
{
sprintf(msgbuf, "%s: write failed: %s", func, SYSTEM_ERROR);
sr_fatal_error(msgbuf);
}
else if (rc == 0)
{
sprintf(msgbuf, "%s: eof on write\n", func);
sr_fatal_error(msgbuf);
}
}
static void blocking_read_sr_errchk(fd, buf, size, func)
int fd;
char *buf;
int size;
char *func;
{
int rc = blocking_read(fd, buf, size);
static char msgbuf[1024];
if (rc < 0)
{
sprintf(msgbuf, "%s: read failed: %s", func, SYSTEM_ERROR);
sr_fatal_error(msgbuf);
}
else if (rc == 0)
{
sprintf(msgbuf, "%s: eof on read\n", func);
sr_fatal_error(msgbuf);
}
}
static void net_setup_anon_listener(backlog, port, skt)
int backlog;
int *port;
int *skt;
{
int rc,sinlen;
struct sockaddr_in sin;
int optval = TRUE;
*skt = socket(AF_INET, SOCK_STREAM, 0);
error_check(*skt,"net_setup_anon_listener socket");
rc = setsockopt(*skt,IPPROTO_TCP,TCP_NODELAY,(char *)&optval,sizeof(optval));
if (rc < 0)
{
sr_fatal_error_1("setsockopt failed: %s", SYSTEM_ERROR);
}
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(0);
sinlen = sizeof(sin);
error_check(bind(*skt,(struct sockaddr *) &sin,sizeof(sin)),
"net_setup_anon_listener bind");
error_check(listen(*skt, backlog), "net_setup_anon_listener listen");
getsockname(*skt, (struct sockaddr *) &sin, &sinlen);
*port = ntohs(sin.sin_port);
}
/*
Accept a connection on socket skt and return fd of new connection.
*/
static int net_accept(skt)
int skt;
{
struct sockaddr_in from;
int fromlen;
int skt2;
int gotit;
fromlen = sizeof(from);
gotit = 0;
while (!gotit)
{
skt2 = accept(skt, (struct sockaddr *) &from, &fromlen);
if (skt2 == -1)
{
if (errno == EINTR)
continue;
else
error_check(skt2, "net_accept accept");
}
else
gotit = 1;
}
return(skt2);
}
static int net_conn_to_listener(hostname, port)
char *hostname;
int port;
{
int rc,s;
struct sockaddr_in listener;
struct hostent *hp;
int optval = TRUE;
hp = gethostbyname(hostname);
if (hp == NULL)
{
sr_fatal_error_1("connect_to_listener: gethostbyname %s failed",
hostname);
}
ZeroOutMemory((char *) &listener, sizeof(listener));
memcpy((char *) &listener.sin_addr, (char *) hp->h_addr, hp->h_length);
listener.sin_family = hp->h_addrtype;
listener.sin_port = htons(port);
s = socket(AF_INET, SOCK_STREAM, 0);
error_check(s, "net_conn_to_listener socket");
error_check(setsockopt(s,IPPROTO_TCP,TCP_NODELAY,
(char *) &optval, sizeof(optval)),
"net_conn_to_listener setsockopt");
rc = connect(s,(struct sockaddr *) &listener, sizeof(listener));
error_check(rc, "net_conn_to_listener connect");
return(s);
}
static void error_check(val, str)
int val;
char *str;
{
char buf[200];
if (val < 0)
{
sprintf(buf, "%s :%d: %s\n", str, val, sys_errlist[errno]);
sr_fatal_error(buf);
}
}
#ifdef DEBUG
static char *ident()
{
static char buf[100];
static char host[128];
static int gotHost = 0;
static int pid;
if (!gotHost)
{
char *s;
gethostname(host, sizeof(host));
if ((s = strchr(host, '.')) !=NULL)
*s = 0;
pid = getpid();
gotHost = 1;
}
sprintf(buf, "%lu-%s-%d", (unsigned long) _p_my_id, host, pid);
return buf;
}
#endif /* DEBUG */
#ifdef STREAMS
/*
* void init_stream_table()
*
* Initialize the stream array by setting all the pointers to NULL.
*/
static void init_stream_table()
{
int_t i;
for (i = 0; i < max_streams; i++)
{
stream_table[i].stream_array = (cell_t *) NULL;
stream_table[i].sindex = i + 1;
stream_table[i].enabled = FALSE;
stream_table[i].mq_front = (msg_buf_header_t *) NULL;
stream_table[i].mq_back = (msg_buf_header_t *) NULL;
}
first_free_stream_id = 0;
stream_table[max_streams-1].sindex = -1;
}
/*
* bool_t _p_alloc_stream(cell_t *stream_array, int_t sindex, int_t *id)
*
* Grab the first free table entry, and fill it in with the passed
* stream pointer to the header of the stream array (stream_array) and the
* index into the stream array (sindex).
*
* Return: TRUE if a stream was succesfully allocated, FALSE otherwise
* *id = the stream id (and integer)
*/
bool_t _p_alloc_stream(stream_array, sindex, id)
cell_t *stream_array;
int_t sindex;
int_t *id;
{
*id = first_free_stream_id;
if (first_free_stream_id == -1)
{
return (FALSE);
}
else
{
stream_table[*id].stream_array = stream_array;
stream_table[*id].sindex = sindex;
first_free_stream_id = stream_table[first_free_stream_id].sindex;
return (TRUE);
}
} /* _p_alloc_stream() */
/*
* void _p_free_stream(int_t id)
*
* Free the stream with the passed stream id.
*/
void _p_free_stream(id)
int_t id;
{
if (id < 0 || id >= max_streams)
_p_fatal_error("_p_free_stream(): Invalid stream id");
#ifdef DEBUG
if (stream_table[id].stream_array == (cell_t *) NULL)
{
fprintf(_p_stdout, "(%lu,%lu) Warning: _p_free_stream(): Freeing a NULL stream: %ld\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(long) id);
}
#endif /* DEBUG */
stream_table[id].stream_array = (cell_t *) NULL;
stream_table[id].sindex = first_free_stream_id;
first_free_stream_id = id;
} /* _p_free_stream() */
/*
* void _p_update_stream(int_t id, cell_t *stream_array)
*
* Update the stream_table entry for id so that it points to the
* passed stream array (stream_array). This procedure is used by the
* garbage collector to update the stream array location after it
* moves the stream array on the heap during garbage collection.
*/
void _p_update_stream(id, stream_array)
int_t id;
cell_t *stream_array;
{
if (id < 0 || id >= max_streams)
_p_fatal_error("_p_update_stream(): Invalid stream id");
#ifdef DEBUG
if (stream_table[id].stream_array == (cell_t *) NULL)
{
fprintf(_p_stdout, "(%lu,%lu) Warning: _p_update_stream(): Updating a NULL stream: %ld\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(long) id);
}
#endif /* DEBUG */
stream_table[id].stream_array = stream_array;
} /* _p_update_stream() */
#ifdef DEBUG
#define ValidateStreamData(ID) \
{ \
if((ID) < 0 || (ID) >= max_streams) \
_p_fatal_error("_p_lookup_stream*(): Invalid stream id"); \
if(stream_table[(ID)].stream_array == (cell_t *) NULL) \
_p_fatal_error("_p_lookup_stream*(): Could not find stream"); \
if(((data_header_t *) (stream_table[(ID)].stream_array))->tag != STREAM_TAG) \
_p_fatal_error("_p_lookup_stream*(): Expected a STREAM_TAG"); \
}
#else /* DEBUG */
#define ValidateStreamData()
#endif /* DEBUG */
/*
* stream_t *_p_lookup_stream(int_t id);
*
* Lookup the stream id in the stream_table. Return the location on the
* heap of the stream_t data structure that correspondes to the appropriate
* index within the stream array.
*
* NOTE: The return value will not point to a tagged structure. It
* will point to the actual stream structure.
*/
stream_t *_p_lookup_stream(id)
int_t id;
{
ValidateStreamData(id);
return ((stream_t *) (stream_table[id].stream_array
+ stream_table[id].sindex) );
} /* _p_lookup_stream() */
/*
* cell_t *_p_lookup_stream_header(int_t id);
*
* Lookup the stream id in the stream_table. Return the location on the
* heap of the header for the stream array that has this stream in it.
*/
cell_t *_p_lookup_stream_header(id)
int_t id;
{
ValidateStreamData(id);
return (stream_table[id].stream_array);
} /* _p_lookup_stream() */
/*
* int_t _p_lookup_stream_index(int_t id);
*
* Lookup the stream id in the stream_table. Return the index of this
* stream in its stream array.
*/
int_t _p_lookup_stream_index(id)
int_t id;
{
ValidateStreamData(id);
return (stream_table[id].sindex);
} /* _p_lookup_stream() */
/*
* bool_t _p_first_stream(int_t *id)
*
* This procedure is used in conjunction with _p_next_stream() to
* iterate through all of the streams. They can be used as follows:
*
* bool_t cont_iter;
* int_t id;
* for (cont_iter = _p_first_stream(&id);
* cont_iter;
* cont_iter = _p_next_stream(&id)
* )
* {
* Do_Whatever();
* }
*
* Return: TRUE if there are any more streams, FALSE otherwise
* *id = the first stream id
*/
bool_t _p_first_stream(id)
int_t *id;
{
*id = -1;
return (_p_next_stream(id));
} /* _p_first_stream() */
/*
* bool_t _p_next_stream(int_t *id)
*
* This procedure is used in conjunction with _p_first_stream() to
* iterate through all of the streams. It returns (in *id) the next
* stream after *id. (i.e. It modifies the value of *id.)
*
* Return: TRUE if there are any more streams, FALSE otherwise
* *id = the next stream id after the one passed in *id
*/
bool_t _p_next_stream(id)
int_t *id;
{
if (*id < -1 || *id >= max_streams)
_p_fatal_error("_p_next_stream(): Invalid stream id");
for ((*id)++;
((*id < max_streams)
&& (stream_table[*id].stream_array == (cell_t *) NULL));
(*id)++ )
;
if (*id >= max_streams)
{
*id = -1;
return (FALSE);
}
else
{
return (TRUE);
}
} /* _p_next_stream() */
/*
* void _p_enable_stream_recv(int_t id)
*
* Enable remote stream receives on the passed stream 'id'.
*
* Optional: If there is a pending remote stream message for this
* stream, then this procedure can go ahead and deliver it.
*/
void _p_enable_stream_recv(id)
int_t id;
{
ValidateStreamData(id);
if (stream_table[id].mq_front != (msg_buf_header_t *) NULL)
{
/* A message is available on this stream, so deliver it */
msg_buf_header_t *msg;
msg = stream_table[id].mq_front;
stream_table[id].mq_front = msg->next;
deliver_stream_msg(msg);
FreeMsgBuf(msg);
}
else if (stream_table[id].closed)
{
/*
* There are no messages left to be delivered, and the stream
* has been closed by the sender. So close the receiving side.
*/
_p_close_stream(id);
}
else
{
stream_table[id].enabled = TRUE;
}
} /* _p_enable_stream_recv() */
/*
* void _p_disable_stream_recv(int_t id)
*
* Disable remote stream receives on the passed stream 'id'.
*
* Optional: If there is a pending remote stream message for this
* stream, then this procedure can go ahead and deliver it.
*/
void _p_disable_stream_recv(id)
int_t id;
{
ValidateStreamData(id);
stream_table[id].enabled = FALSE;
} /* _p_disable_stream_recv() */
/*
* void _p_send_close_stream(int_t node, int_t id)
*
* Close down stream <node,id>. This procedure is called from the
* sending side of the stream.
*
* Care must be taken to ensure that the stream is not closed on the
* receiving side until all messages on this stream have been delivered.
* Thus, this stream_close message needs to remain ordered relative to
* other sends on this stream.
*/
void _p_send_close_stream(node, id)
int_t node;
int_t id;
{
cell_t *msg_buf;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_send_close_stream(): to node %lu, id %ld\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node, (long) id);
fflush(_p_stdout);
}
#endif /* DEBUG */
msg_buf = _p_alloc_msg_buffer(1);
*((int_t *) msg_buf) = id;
_p_msg_send(msg_buf, node, 1, MSG_CLOSE_STREAM);
#ifdef GAUGE
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
} /* _p_send_close_stream() */
/*
* void _p_stream_send(int_t id, int_t node,
* cell_t *array, u_int_t offset, u_int_t size)
*
* Send a stream message it stream id, 'id', on node 'node'. The
* message should come from the passed 'array', starting 'offset'
* elements into the array, 'size' elements in size.
*/
void _p_stream_send(id, array, offset, size)
int_t id;
cell_t *array;
u_int_t offset;
u_int_t size;
{
/*
* Allocate a message buffer of the appropriate size, set up the
* the header, copy in the data, and deliver it.
*/
}
/*
* void handle_close_stream(msg_buf_header_t *msg)
*
* When _p_msg_receive() receives a MSG_CLOSE_STREAM message it
* calls this procedure to handle it. This procedure should figure
* out which stream id this message is for, mark the closed flag in
* the stream_table, and deliver the close stream (call _p_close_stream())
* if there are no messages pending delivery.
*/
static void handle_close_stream(msg)
msg_buf_header_t *msg;
{
int id;
id = *((int_t *) (msg->user_buf));
ValidateStreamData(id);
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "(%lu,%lu) handle_close_stream(): From node %lu, id %ld\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) msg->node, (long) id);
}
#endif /* DEBUG */
stream_table[id].closed = TRUE;
if (stream_table[id].mq_front == (msg_buf_header_t *) NULL)
_p_close_stream(id);
#ifdef GAUGE
_p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
} /* handle_close_stream() */
/*
* void handle_stream_receive(msg_buf_header_t *msg)
*
* When _p_msg_receive() receives a stream message it calls this procedure
* to handle it. This procedure should figure out which stream id
* this message is for, and do the following:
* - If receives are enabled on this id, then deliver the message.
* - If recevies are not enabled on this id, then queue the message up.
*/
static void handle_stream_receive(msg)
msg_buf_header_t *msg;
{
} /* handle_stream_receive() */
/*
* void deliver_stream_msg(msg_buf_header_t *msg)
*
* When a message is ready for delivery (there is a pending receive
* and received data) then this procedure is called. It should find
* the stream data structure on the heap and copy the contents of
* the message buffer as appropriate.
*/
static void deliver_stream_msg(msg)
msg_buf_header_t *msg;
{
} /* deliver_stream_receive() */
#endif /* STREAMS */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.