This is sr_machipc.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* sr_machipc.c - Send/Receive routines built on Mach IPC
*
* #include "sr_doc.h"
* See sr_doc.h for detailed documentation on Send/Receive modules.
*
* This sr module has not been kept up to date...
*/
#include "pcn.h"
#include <mach.h>
#include <servers/netname.h>
#define MSG_BACKLOG PORT_BACKLOG_MAX /* Size of message backlog queue */
#define TIMEOUT 60000 /* Timeout on receives during startup (ms) */
#define SEND_TIMEOUT_VAL 5000 /* Timeout on receives during startup (ms) */
typedef struct msg_buf_header_struct
{
struct msg_buf_header_struct * next;
int_t size;
int_t type;
int_t node;
cell_t user_buf[1];
} msg_buf_header_t;
#define HEADER_SIZE 4
#define MsgSize2Bytes(Size) (((Size) + HEADER_SIZE) * CELL_SIZE)
#define User2MsgBuf(Buf) ((msg_buf_header_t *) (((cell_t *) (Buf)) \
- HEADER_SIZE))
typedef struct
{
msg_header_t h;
msg_type_long_t t;
msg_buf_header_t * buf;
} pcn_msg_t;
typedef struct
{
msg_header_t h;
} m0_t;
typedef struct
{
msg_header_t h;
msg_type_t t;
int i1;
} m1_t;
typedef struct
{
msg_header_t h;
msg_type_t t;
int i1;
int i2;
} m2_t;
/* MSG_TYPE_NORMAL is defined in <sys/message.h> */
#define MSG_TYPE_ABORT 2
static port_t worker_port[MAX_NODES];
static port_t my_port;
static bool_t aborted = FALSE;
static msg_buf_header_t *mq_front, *mq_back;
static char parallel_startup[MAX_PATH_LENGTH];
static char parallel_info[MAX_PATH_LENGTH];
static bool_t dont_start_remotes;
/*
* void _p_sr_get_argdesc(argdesc_t **argdescp, int *n_argdesc)
*/
void _p_sr_get_argdesc(argc, argv, argdescp, n_argdescp)
int argc;
char **argv;
argdesc_t **argdescp;
int *n_argdescp;
{
static argdesc_t argdesc[] = {
INTEGER_ARG("n", &_p_nodes, "number of nodes"),
STRING_ARG("s", parallel_startup, "system specific parallel startup file"),
STRING_ARG("pi", parallel_info, "parallel info for node processes"),
BOOL_ARG("nostart", &dont_start_remotes, "don't actually start remote nodes"),
};
parallel_startup[0] = '\0';
parallel_info[0] = '\0';
dont_start_remotes = FALSE;
*argdescp = argdesc;
*n_argdescp = sizeof(argdesc) / sizeof(argdesc[0]);
} /* _p_sr_get_argdesc() */
/*
* void sr_fatal_error(char *msg)
*/
static void sr_fatal_error(msg)
char *msg;
{
if (_p_my_id == -1) /* Node doesn't have an id yet */
{
fprintf(stderr,
"Fatal error: Node ???: Failed to create workers: %s\n",
msg);
}
else
{
fprintf(stderr,
"Fatal error: Node %d: Failed to create workers: %s\n",
_p_my_id, msg);
}
exit(1);
} /* sr_fatal_error() */
/*
* void setup_host_port(netname_name_t *netname)
*
* Allocate an unrestricted port that the workers can use to check into, and
* set the global variable, 'my_port', to this port.
* Then, register the host's port with the net name server, so that the
* workers can find the port.
*/
static void setup_host_port(netname)
netname_name_t netname;
{
extern int getpid();
_p_host = TRUE;
/* Allocate a port for the host */
if (port_allocate(task_self(), &my_port) != KERN_SUCCESS)
sr_fatal_error("Failed port_allocate()");
port_set_backlog(task_self(), my_port, MSG_BACKLOG);
worker_port[0] = my_port;
/* Register this host port with the net message server */
sprintf(netname, "pcn%d", getpid());
/*
port_unrestrict(task_self(), &my_port);
*/
if (netname_check_in(name_server_port, netname, PORT_NULL, my_port)
!= NETNAME_SUCCESS)
sr_fatal_error("Failed netname_check_in()");
} /* setup_host_port() */
/*
* int fork_nodes(int n)
*
* Fork off n worker nodes.
*/
static int fork_nodes(n)
int n;
{
int pid = 1;
for (; n > 0; n--)
{
#ifdef DEBUG
if (ParDebug(9))
printf("Forking off a worker node\n");
#endif DEBUG
if ((pid = fork()) < 0) /* error */
{
sr_fatal_error("Failed fork()");
}
else if (pid == 0) /* child */
{
_p_host = FALSE;
break;
}
}
return (pid);
} /* fork_nodes() */
/*
* void _p_sr_init_node()
*
* Do the parallel initialization, and call _p_init_node() before returning.
*
* parallel_startup string comes from the command line -s option. This
* is what the user would set to specify a startup file (what machines to
* start workers up on).
*
* The parallel_info string comes from the command line -pi option. This
* is used by the host to pass arguments to the workers. The user should
* never set this option.
*
* If both parallel_startup and parallel_info are empty strings,
* then we know we are the host and that we should just fork off
* _p_nodes-1 workers.
*
* Error handling: One cool thing about this function is that if we fail
* to create any worker, the whole world will shut itself down automatically
* in TIMEOUT seconds, since that is longest a worker will wait for a receive.
* In other words, much of the icky error handling that deals with the
* creation of workers is handled by default!
*
* Note: _p_sr_init_node() should not use _p_fatal_error(). That function
* relies on _p_stdout being set, which doesn't happen until after
* _p_sr_init_node() is done. Instead it should use its own function,
* sr_fatal_error() to handle fatal errors when creating the workers.
*/
void _p_sr_init_node()
{
char buf1[MAX_PATH_LENGTH], buf2[MAX_PATH_LENGTH], buf3[512], *b1, *b2;
netname_name_t netname;
char host_name[256];
port_t host_port;
int i, j;
FILE *fp;
bool_t found;
m0_t m0;
m1_t m1;
m2_t m2;
/* Return if this is not a multiprocessing run */
if (_p_nodes == 1 && parallel_startup[0] == '\0'
&& parallel_info[0] == '\0')
{
_p_my_id = _p_host_id = 0;
_p_nodes = 1;
_p_host = TRUE;
_p_usehost = TRUE;
/* Finish node initialization and run emulator */
_p_init_node();
return;
}
if(_p_nodes > MAX_NODES)
{
char buf[256];
sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
sr_fatal_error(buf);
}
/* Get the host's host name */
if (gethostname(host_name, sizeof(host_name)) == -1)
sr_fatal_error("Failed to get host's machine name");
if (parallel_startup[0] == '\0' && parallel_info[0] == '\0')
{
/*
* This is the host. It should fork off _p_nodes-1 workers.
*/
setup_host_port(&netname);
fork_nodes(_p_nodes-1);
}
else if (parallel_startup[0] != '\0')
{
/*
* This is the host. It should read the startup file (specified
* by the string in parallel_startup, and use that information to
* system() off workers.
*
* In the startup file, each line represents one startup command,
* where a startup command is either:
* fork <n> - fork off this may workers locally
* exec <n>:<command> $ARGS$ ... - a command to execute a pcn worker
*
* The exec form is for issuing a command (such as rsh) to start
* up workers (often on another machine):
* The <n> argument specifies how many workers will be forked
* by that execution.
* The $ARGS$ of the command will be replaced with the -pi
* argument which contains the host's host and the host
* port's netname, and the -n argument with specifies how
* many workers that pcn should fork off.
* The <command> argument is the command that should be used
* to execute the worker. It can use all flags except
* -h, -s, -pi and -n when starting up the node. The $ARGS$
* string should be embedded somewhere in this command.
*
* The command should NOT use the -s argument!!!
*/
setup_host_port(&netname);
if ((fp = fopen(parallel_startup, "r")) == (FILE *) NULL)
sr_fatal_error("Unable to open parallel startup file");
_p_nodes = 1; /* Count the # of nodes that we start up */
/* Read in parallel startup file, one line at a time */
for (i = 1; fgets(buf1, MAX_PATH_LENGTH - 1, fp) != (char *) NULL; i++)
{
if (buf1[0] == '%' || buf1[0] == '\n' || buf1[0] == ' '
|| buf1[0] == '\t' || buf1[0] == '#')
{
continue;
}
else if (strncmp(buf1, "fork", 4) == 0)
{
/* Fork off the specified number of nodes */
if (sscanf(buf1+4, "%d", &j) != 1 || j < 1)
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
_p_nodes += j;
if(_p_nodes > MAX_NODES)
{
char buf[256];
sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
sr_fatal_error(buf);
}
if (fork_nodes(j) == 0) /* child */
break;
}
else if (strncmp(buf1, "exec", 4) == 0) /* use system() */
{
j = strlen(buf1) - 1;
if (buf1[j] == '\n')
buf1[j] = '\0';
/* Get the number of nodes to fork */
if (sscanf(buf1+4, "%d", &j) != 1 || j < 1)
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
_p_nodes += j;
if(_p_nodes > MAX_NODES)
{
char buf[256];
sprintf(buf, "Too many nodes: MAX_NODES = %d", MAX_NODES);
sr_fatal_error(buf);
}
/* Find the start of the command */
if ((b1 = strchr(buf1, ':')) == (char *) NULL)
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
/* Construct the $ARGS$ argument */
sprintf(buf3, " -n %d -pi %s,%s ",
j, host_name, netname);
/* Substitute $ARGS$ in the command with appropriate stuff */
for (++b1, b2 = buf2, found = FALSE; *b1 != '\0'; )
{
if (strncmp(b1, "$ARGS$", 6) == 0)
{
*b2 = '\0';
strcat(buf2, buf3);
strcat(buf2, b1+6);
found = TRUE;
break;
}
else
*b2++ = *b1++;
}
if (!found)
{
sprintf(buf1, "Bad startup command, line %d -- no $ARGS$",
i);
sr_fatal_error(buf1);
}
strcat(buf2, " &");
/* Now system() it off */
#ifdef DEBUG
if (ParDebug(9))
printf("Execing worker with: %s\n", buf2);
#endif /* DEBUG */
if (!dont_start_remotes)
if (system(buf2) != 0)
sr_fatal_error("Failed system()");
}
else
{
sprintf(buf1, "Bad startup command, line %d", i);
sr_fatal_error(buf1);
}
}
fclose(fp);
}
else /* parallel_info[0] != '\0' */
{
/*
* This is a worker that was started up by a system() call.
*
* parallel_info contains the -pi argument, which contains the
* host_name and the netname
* _p_nodes contains the number of nodes to fork off locally
*/
if ((b1 = strchr(parallel_info, ',')) == (char *) NULL)
sr_fatal_error("Failed to parse -s argument");
*b1 = '\0';
strcpy(host_name, parallel_info);
strcpy(netname, b1+1);
fork_nodes(_p_nodes - 1);
}
/*
* Each worker will check into the host through the
* host's registed port.
* - In the checkin message will contain the worker's port.
* - The host will return a message to the worker with his id
* and the number of nodes.
*/
if (_p_host)
{
_p_my_id = _p_host_id = _p_nodes - 1;
worker_port[_p_host_id] = my_port;
#ifdef DEBUG
if (ParDebug(9))
printf("Host: Waiting for workers to check in\n");
#endif DEBUG
for (i = 0; i < _p_nodes-1; i++)
{
m0.h.msg_size = sizeof(m0);
m0.h.msg_local_port = my_port;
if (msg_receive((msg_header_t *) &m0, RCV_TIMEOUT, TIMEOUT)
!= RCV_SUCCESS)
sr_fatal_error("Some workers did not check in");
#ifdef DEBUG
if (ParDebug(9))
printf("Host: received port %d for node %d\n",
m0.h.msg_remote_port, i);
#endif DEBUG
worker_port[i] = m0.h.msg_remote_port;
/*
* Now send a message back to worker with that node's id,
* and the total number of nodes.
*/
m2.h.msg_simple = TRUE;
m2.h.msg_size = sizeof(m2);
m2.h.msg_type = MSG_TYPE_NORMAL;
m2.h.msg_local_port = PORT_NULL;
m2.h.msg_remote_port = worker_port[i];
m2.t.msg_type_name = MSG_TYPE_INTEGER_32;
m2.t.msg_type_size = 32;
m2.t.msg_type_number = 2;
m2.t.msg_type_inline = TRUE;
m2.t.msg_type_longform = FALSE;
m2.t.msg_type_deallocate = FALSE;
m2.i1 = i;
m2.i2 = _p_nodes;
if (msg_send((msg_header_t *) &m2, MSG_OPTION_NONE, 0)
!= SEND_SUCCESS)
sr_fatal_error("Failed to send id and # of nodes to worker");
}
}
else /* worker */
{
/* Allocate this worker's port */
if (port_allocate(task_self(), &my_port) != KERN_SUCCESS)
sr_fatal_error("Failed port_allocate() on worker");
port_set_backlog(task_self(), my_port, MSG_BACKLOG);
#ifdef DEBUG
if (ParDebug(9))
printf("Node ???: Allocated port %d\n", my_port);
#endif DEBUG
/* Lookup the host's port on the local host */
if (netname_look_up(name_server_port, host_name, netname,
&host_port) != NETNAME_SUCCESS)
sr_fatal_error("Failed worker (failed netname_look_up())");
#ifdef DEBUG
if (ParDebug(9))
printf("Node ???: Got host's port %d\n", host_port);
#endif DEBUG
/* Send a message to the host with our port */
m0.h.msg_simple = TRUE;
m0.h.msg_size = sizeof(m0);
m0.h.msg_type = MSG_TYPE_NORMAL;
m0.h.msg_local_port = my_port;
m0.h.msg_remote_port = host_port;
if (msg_send((msg_header_t *) &m0, MSG_OPTION_NONE, 0) != SEND_SUCCESS)
sr_fatal_error("Failed to send port to host");
#ifdef DEBUG
if (ParDebug(9))
printf("Node ???: Sent port to host\n");
#endif DEBUG
/* Receive message from host with id and # of nodes */
m2.h.msg_size = sizeof(m2);
m2.h.msg_local_port = my_port;
if (msg_receive((msg_header_t *) &m2, RCV_TIMEOUT, TIMEOUT)
!= RCV_SUCCESS)
sr_fatal_error("Failed to get id and # of nodes from host");
_p_my_id = m2.i1;
_p_nodes = m2.i2;
_p_host_id = _p_nodes - 1;
worker_port[_p_my_id] = my_port;
worker_port[_p_host_id] = host_port;
#ifdef DEBUG
if (ParDebug(9))
printf("(%d) Received id from host -- %d nodes\n",
_p_my_id, _p_nodes);
#endif DEBUG
}
if (_p_host)
netname_check_out(name_server_port, netname, PORT_NULL);
/*
* Now the host has the ports for every worker. So distribute all
* of the ports to all of the works. (A worker already knows his port
* that of the host, so there is no need to send them.)
*/
if (_p_host)
{
for (j = 0; j < _p_nodes-1; j++)
{
for (i = 0; i < _p_nodes-1; i++)
{
if (i != j)
{
/* Send a message to worker i with port for worker j */
m1.h.msg_simple = TRUE;
m1.h.msg_size = sizeof(m1);
m1.h.msg_type = MSG_TYPE_NORMAL;
m1.h.msg_local_port = worker_port[j];
m1.h.msg_remote_port = worker_port[i];
m1.t.msg_type_name = MSG_TYPE_INTEGER_32;
m1.t.msg_type_size = 32;
m1.t.msg_type_number = 1;
m1.t.msg_type_inline = TRUE;
m1.t.msg_type_longform = FALSE;
m1.t.msg_type_deallocate = FALSE;
m1.i1 = j;
#ifdef DEBUG
if (ParDebug(9))
printf("Host: Sending port for node %d to node %d\n",
j, i);
#endif DEBUG
if (msg_send((msg_header_t *) &m1, MSG_OPTION_NONE, 0)
!= SEND_SUCCESS)
{
sprintf(buf1,
"Failed to send port for node %d to node %d",
j, i);
sr_fatal_error(buf1);
}
#ifdef DEBUG
if (ParDebug(9))
printf("Host: Sent port for node %d to node %d\n",
j, i);
#endif DEBUG
}
}
}
}
else /* worker */
{
for (i = 0; i < _p_nodes-2; i++)
{
/* Wait for _p_nodes-2 messages with port to other nodes */
m1.h.msg_size = sizeof(m1);
m1.h.msg_local_port = my_port;
if (msg_receive((msg_header_t *) &m1, RCV_TIMEOUT, TIMEOUT)
!= RCV_SUCCESS)
sr_fatal_error("Failed to receive ports to other nodes");
#ifdef DEBUG
if (ParDebug(9))
printf("(%d) Received port %d for node %d\n",
_p_my_id, m1.h.msg_remote_port, m1.i1);
#endif DEBUG
worker_port[m1.i1] = m1.h.msg_remote_port;
}
}
/*
* Do any other send/receive initialization stuff here...
*/
_p_default_msg_buffer_size = vm_page_size / CELL_SIZE - HEADER_SIZE;
mq_front = mq_back = (msg_buf_header_t *) NULL;
_p_usehost = TRUE;
/* Finish node initialization and run emulator */
_p_init_node();
} /* _p_sr_init_node() */
/*
* void _p_sr_node_initialized()
*/
void _p_sr_node_initialized()
{
char buf[256];
m1_t m1;
int i;
if (!_p_multiprocessing)
return;
/*
* Final checkin. If one of the
* nodes fails to initialize (and dies), then this final checkin will
* cause all of the other workers to shut down automaticly, due
* to the timeout on the receives.
*/
if (_p_host)
{
/*
* Host: Wait for each worker to checkin, then send message to
* each worker.
*/
for (i = 0; i < _p_nodes-1; i++)
{
m1.h.msg_size = sizeof(m1);
m1.h.msg_local_port = my_port;
if (msg_receive((msg_header_t *) &m1, RCV_TIMEOUT, TIMEOUT)
!= RCV_SUCCESS)
_p_fatal_error("A node did not checkin after initialization");
#ifdef DEBUG
if (ParDebug(9))
printf("Host: Final checkin received from node %d\n", m1.i1);
#endif DEBUG
}
m1.h.msg_simple = TRUE;
m1.h.msg_size = sizeof(m1);
m1.h.msg_type = MSG_TYPE_NORMAL;
m1.h.msg_local_port = PORT_NULL;
m1.t.msg_type_name = MSG_TYPE_INTEGER_32;
m1.t.msg_type_size = 32;
m1.t.msg_type_number = 1;
m1.t.msg_type_inline = TRUE;
m1.t.msg_type_longform = FALSE;
m1.t.msg_type_deallocate = FALSE;
m1.i1 = 0;
for (i = 0; i < _p_nodes-1; i++)
{
/* Send a message to worker i */
m1.h.msg_remote_port = worker_port[i];
if (msg_send((msg_header_t *) &m1, MSG_OPTION_NONE, 0)
!= SEND_SUCCESS)
{
sprintf(buf, "Failed to send final initialization to node %d",
i);
_p_fatal_error(buf);
}
#ifdef DEBUG
if (ParDebug(9))
printf("Host: Final checkin sent to node %d\n", i);
#endif DEBUG
}
}
else
{
/* Worker: Send message to host, then receive message */
m1.h.msg_simple = TRUE;
m1.h.msg_size = sizeof(m1);
m1.h.msg_type = MSG_TYPE_NORMAL;
m1.h.msg_local_port = PORT_NULL;
m1.h.msg_remote_port = worker_port[_p_host_id];
m1.t.msg_type_name = MSG_TYPE_INTEGER_32;
m1.t.msg_type_size = 32;
m1.t.msg_type_number = 1;
m1.t.msg_type_inline = TRUE;
m1.t.msg_type_longform = FALSE;
m1.t.msg_type_deallocate = FALSE;
m1.i1 = _p_my_id;
if (msg_send((msg_header_t *) &m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS)
_p_fatal_error("Failed to send final checkin to host");
#ifdef DEBUG
if (ParDebug(9))
printf("(%d) Sent final checkin\n", _p_my_id);
#endif DEBUG
m1.h.msg_local_port = my_port;
if (msg_receive((msg_header_t *) &m1, RCV_TIMEOUT, TIMEOUT)
!= RCV_SUCCESS)
_p_fatal_error("Failed to receive final checkin");
#ifdef DEBUG
if (ParDebug(9))
printf("(%d) Received final checkin\n", _p_my_id);
#endif DEBUG
}
#ifdef DONT_INCLUDE
/* The following is test stuff */
sleep(1);
printf("\n");
for (i = 0; i < _p_nodes; i++)
{
printf ("(%d)%s: Summary: node %d has port %d\n", _p_my_id,
(_p_host ? " (host)" : ""), i, worker_port[i]);
}
sleep(1);
/* Test by sending something around the ring */
if (_p_host)
{
printf("\nHost: Sending test\n");
i = (_p_my_id+1) % _p_nodes;
m1.h.msg_simple = TRUE;
m1.h.msg_size = sizeof(m1);
m1.h.msg_type = MSG_TYPE_NORMAL;
m1.h.msg_local_port = PORT_NULL;
m1.h.msg_remote_port = worker_port[i];
m1.t.msg_type_name = MSG_TYPE_INTEGER_32;
m1.t.msg_type_size = 32;
m1.t.msg_type_number = 1;
m1.t.msg_type_inline = TRUE;
m1.t.msg_type_longform = FALSE;
m1.t.msg_type_deallocate = FALSE;
m1.i1 = 42;
printf("Host: Sending test to node %d on port %d\n", i,
m1.h.msg_remote_port);
if (msg_send(&m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS)
_p_fatal_error("Failed to send test to node 0.");
printf("Host: Sent test to node %d\n", i);
m1.h.msg_size = sizeof(m1);
m1.h.msg_local_port = my_port;
if (msg_receive(&m1, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS)
_p_fatal_error("Host failed to receive test.");
printf("Host: Received test (%d)\n", m1.i1);
}
else
{
m1.h.msg_size = sizeof(m1);
m1.h.msg_local_port = my_port;
if (msg_receive(&m1, RCV_TIMEOUT, TIMEOUT) != RCV_SUCCESS)
_p_fatal_error("Worker failed to receive test.");
printf("(%d) Received test (%d)\n", _p_my_id, m1.i1);
i = (_p_my_id+1) % _p_nodes;
m1.h.msg_local_port = PORT_NULL;
m1.h.msg_remote_port = worker_port[i];
printf("(%d) Sending test to node %d on port %d\n", _p_my_id,
i, m1.h.msg_remote_port);
if (msg_send(&m1, MSG_OPTION_NONE, 0) != SEND_SUCCESS)
_p_fatal_error("Failed to send test to next node");
printf("(%d) Sent test to node %d\n", _p_my_id, i);
}
exit(1);
#endif DONT_INCLUDE
} /* _p_sr_node_initialized() */
/*
* void _p_destroy_nodes()
*/
void _p_destroy_nodes()
{
}
/*
* void _p_abort_nodes()
*/
void _p_abort_nodes()
{
int i;
pcn_msg_t msg;
/* Deallocate my port so that sends to it will fail */
port_deallocate(task_self(), my_port);
if (aborted) /* Someone else initiated the abort */
return;
for (i = 0; i < _p_nodes; i++)
{
if (i != _p_my_id)
{
msg.h.msg_simple = FALSE;
msg.h.msg_size = sizeof(pcn_msg_t);
msg.h.msg_type = MSG_TYPE_ABORT;
msg.h.msg_local_port = PORT_NULL;
msg.h.msg_remote_port = worker_port[i];
msg.t.msg_type_long_name = MSG_TYPE_UNSTRUCTURED;
msg.t.msg_type_long_size = 32;
msg.t.msg_type_long_number = 0;
msg.t.msg_type_header.msg_type_inline = FALSE;
msg.t.msg_type_header.msg_type_longform = TRUE;
msg.t.msg_type_header.msg_type_deallocate = FALSE;
msg.buf = (msg_buf_header_t *) NULL;
msg_send((msg_header_t *) &msg, SEND_TIMEOUT, 5000);
}
}
} /* _p_abort_nodes() */
/*
* cell_t *_p_alloc_msg_buffer(int size)
*/
cell_t *_p_alloc_msg_buffer(size)
int size; /* In cells */
{
int i;
msg_buf_header_t *msg_buf;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%d,%d) _p_alloc_msg_buffer: Allocating %d cells\n",
_p_my_id, _p_reduction, size);
fflush(_p_stdout);
}
#endif /* DEBUG */
i = MsgSize2Bytes(size);
if (vm_allocate(task_self(), (vm_address_t *) &msg_buf, i, TRUE)
!= KERN_SUCCESS)
{
_p_fatal_error("Failed to allocate a message buffer");
}
msg_buf->size = size;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%d,%d) _p_alloc_msg_buffer: Allocated %d cells\n",
_p_my_id, _p_reduction, size);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (msg_buf->user_buf);
} /* _p_alloc_msg_buffer() */
/*
* FreeMsgBuf(Buf, Size)
*
* Buf points to the beginning of the message buffer (not the user's buffer,
* but the first message header).
* Size is the size of the user portion of the buffer, in Cells.
*/
#define FreeMsgBuf(Buf, Size) \
{ \
if (vm_deallocate(task_self(), (vm_address_t) Buf, MsgSize2Bytes(Size)) \
!= KERN_SUCCESS) \
{ \
_p_fatal_error("Failed to free a message buffer"); \
} \
} /* FreeMsgBuf() */
/*
* void _p_msg_send(cell_t *buf, int node, int size, int type)
*/
void _p_msg_send(buf, node, size, type)
cell_t *buf;
int node;
int size; /* in cells */
int type;
{
msg_buf_header_t *send_buf, *rcv_buf;
char error_buf[256];
int_t allocated_size;
pcn_msg_t msg, rcv_msg;
msg_return_t rc;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%d,%d) _p_msg_send: Sending to n %d, s %d, t %d\n",
_p_my_id, _p_reduction, node, size, type);
fflush(_p_stdout);
}
#endif /* DEBUG */
#ifdef DEBUG
if (node == _p_my_id)
_p_fatal_error("Internal error: Cannot send message to myself!");
#endif /* DEBUG */
/*
* If buf == NULL, then we have an empty message. So just allocate
* enough space for the message headers.
*/
if (buf == (cell_t *) NULL)
{
if (vm_allocate(task_self(), (vm_address_t *) &send_buf,
MsgSize2Bytes(0), TRUE) != KERN_SUCCESS)
{
_p_fatal_error("Failed to allocate a message buffer");
}
allocated_size = 0;
size = 0;
}
else
{
send_buf = User2MsgBuf(buf);
allocated_size = send_buf->size;
}
send_buf->size = size;
send_buf->type = type;
send_buf->node = _p_my_id;
msg.h.msg_simple = FALSE;
msg.h.msg_size = sizeof(pcn_msg_t);
msg.h.msg_type = MSG_TYPE_NORMAL;
msg.h.msg_local_port = PORT_NULL;
msg.h.msg_remote_port = worker_port[node];
msg.t.msg_type_long_name = MSG_TYPE_UNSTRUCTURED;
msg.t.msg_type_long_size = 32;
msg.t.msg_type_long_number = size + HEADER_SIZE;
msg.t.msg_type_header.msg_type_inline = FALSE;
msg.t.msg_type_header.msg_type_longform = TRUE;
msg.t.msg_type_header.msg_type_deallocate = FALSE;
msg.buf = send_buf;
while(1)
{
while(1)
{
rcv_msg.h.msg_size = sizeof(pcn_msg_t);
rcv_msg.h.msg_local_port = my_port;
if ((rc = msg_receive((msg_header_t *) &rcv_msg, RCV_TIMEOUT, 0))
== RCV_SUCCESS)
{
rcv_buf = rcv_msg.buf;
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = rcv_buf;
mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
}
else if (rc == RCV_TIMED_OUT)
break;
else if (rc != RCV_SUCCESS)
_p_fatal_error("Failed on non-blocking receive");
}
if ((rc = msg_send((msg_header_t *) &msg,
SEND_TIMEOUT, SEND_TIMEOUT_VAL))
== SEND_SUCCESS)
{
break;
}
else if (rc == SEND_TIMED_OUT)
{
fprintf(_p_stdout,
"(%d,%d) Warning: _p_msg_send() timed out -- retrying\n",
_p_my_id, _p_reduction);
fflush(_p_stdout);
}
else
{
sprintf(error_buf,
"Failed to send message -- return from msg_send(): %d",
rc);
_p_fatal_error(error_buf);
}
}
FreeMsgBuf(send_buf, allocated_size);
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout, "(%d,%d) _p_msg_send: Sent\n",
_p_my_id, _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* _p_send_message() */
/*
* bool_t _p_msg_receive(int *node, int *size, int *type, int rcv_type)
*/
bool_t _p_msg_receive(node, size, type, rcv_type)
int *node;
int *size;
int *type;
int rcv_type;
{
msg_buf_header_t *rcv_buf;
pcn_msg_t msg;
msg_return_t rc;
bool_t done = FALSE;
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%d,%d) _p_msg_receive: Receiving message, type %d\n",
_p_my_id, _p_reduction, rcv_type);
fflush(_p_stdout);
}
#endif /* DEBUG */
/* Take message off queue if appropriate */
if (mq_front != (msg_buf_header_t *) NULL
&& (rcv_type == RCV_NOBLOCK || rcv_type == RCV_BLOCK))
{
rcv_buf = mq_front;
mq_front = mq_front->next;
}
/* else, get next message from port */
else
{
msg.h.msg_size = sizeof(pcn_msg_t);
msg.h.msg_local_port = my_port;
switch (rcv_type)
{
case RCV_NOBLOCK:
if ((rc = msg_receive((msg_header_t *) &msg, RCV_TIMEOUT, 0))
== RCV_TIMED_OUT)
{
#ifdef DEBUG
if (ParDebug(9))
{
fprintf(_p_stdout,
"(%d,%d) _p_msg_receive: Non-blocking return\n",
_p_my_id, _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (FALSE);
}
else if (rc != RCV_SUCCESS)
_p_fatal_error("Failed on non-blocking receive");
rcv_buf = msg.buf;
break;
case RCV_BLOCK:
if (msg_receive((msg_header_t *) &msg, MSG_OPTION_NONE, 0)
!= RCV_SUCCESS)
_p_fatal_error("Failed on blocking receive");
rcv_buf = msg.buf;
break;
case RCV_PARAMS:
while (!done)
{
if (msg_receive((msg_header_t *) &msg, MSG_OPTION_NONE, 0)
!= RCV_SUCCESS)
_p_fatal_error("Failed on blocking receive");
if (msg.h.msg_type == MSG_TYPE_ABORT)
done = TRUE;
else
{
rcv_buf = msg.buf;
if ( rcv_buf->type == MSG_PARAMS
|| rcv_buf->type == MSG_EXIT )
{
done = TRUE;
}
else
{
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = rcv_buf;
mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
}
}
}
break;
case RCV_COLLECT:
while (!done)
{
if (msg_receive((msg_header_t *) &msg, MSG_OPTION_NONE, 0)
!= RCV_SUCCESS)
_p_fatal_error("Failed on blocking receive");
if (msg.h.msg_type == MSG_TYPE_ABORT)
done = TRUE;
else
{
rcv_buf = msg.buf;
switch (rcv_buf->type)
{
case MSG_READ:
case MSG_CANCEL:
case MSG_EXIT:
done = TRUE;
break;
case MSG_COLLECT:
FreeMsgBuf(rcv_buf, rcv_buf->size);
break;
case MSG_DEFINE:
case MSG_VALUE:
if (mq_front == (msg_buf_header_t *) NULL)
{
mq_front = mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
else
{
mq_back->next = rcv_buf;
mq_back = rcv_buf;
rcv_buf->next = (msg_buf_header_t *) NULL;
}
break;
default:
_p_fatal_error("_p_msg_receive(): Illegal message type");
break;
}
}
}
break;
}
if (msg.h.msg_type == MSG_TYPE_ABORT)
{
aborted = TRUE;
_p_fatal_error("Received an abort message");
}
}
*size = rcv_buf->size;
*node = rcv_buf->node;
*type = rcv_buf->type;
/*
* Now copy the message onto the heap. If there is not enough room
* for it, then garbage collect.
* Assume that there will always be enough room for MSG_CANCEL message.
*/
if (*type != MSG_CANCEL)
{
TryGCWithSize(*size);
}
#ifdef PCN_ALIGN_DOUBLES
if (DoubleAligned(_p_heap_ptr))
_p_heap_ptr++;
#endif /* PCN_ALIGN_DOUBLES */
memcpy(_p_heap_ptr, (char *) rcv_buf->user_buf, *size * CELL_SIZE);
FreeMsgBuf(rcv_buf, *size);
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout,
"(%d,%d) _p_msg_receive: Received from n %d, s %d, t %d\n",
_p_my_id, _p_reduction, *node, *size, *type);
fflush(_p_stdout);
}
#endif /* DEBUG */
return (TRUE);
} /* _p_msg_receive() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.