This is parallel.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* parallel.c - Code to support parallel PCN
*/
#include "pcn.h"
#ifdef PARALLEL
static cell_t *ct_stack_cp[CT_STACK_SIZE];
static int_t ct_stack_loc[CT_STACK_SIZE];
static int_t ct_stack_size = 0;
/* These are used by copy_term() and scan_term() */
#define CTAddRREF_TAG(I) ((((int_t) (I)) << TAG_SIZE) | RREF_TAG)
#define CTRemoveRREF_TAG(I) ((((int_t )(I)) & 0xFFFFFFF0) >> TAG_SIZE)
#define CTBuildRref(Weight, Location, Node) \
{ \
rref = (rref_t *) &buf[bufloc]; \
rref->tag = RREF_TAG; \
rref->mark = 0; \
rref->suspensions = 0; \
rref->node = (Node); \
rref->location = (Location); \
rref->weight = (Weight); \
rref->value_return_irt = RREF_NOT_READ_IRT; \
rref->trailer_tag = RREF_TRAILER_TAG; \
bufloc += RrefSizeWithTrailer(); \
}
#define CTNoSpace(Needed) \
(bufloc + (RrefSizeWithTrailer() * ct_stack_size) + (Needed)) > buflen
/*
* copy_term()
*
* Allocate a message buffer and copy the passed term into it. 'extra_cells'
* cells are left free at the end of the message buffer. 'termlen'
* will be set to the length (in cells) of the term in the returned buffer.
* (termlen does not include the extra cells)
*
* All references are translated into relative addresses (offset). The
* offsets are in bytes, not Cells, so that offset can be detected by
* scan_term() using IsRef().
*
* If PCN_ALIGN_DOUBLES is defined, then all data structures are double
* word aligned.
*/
static cell_t *copy_term(term, termlen, extra_cells)
cell_t *term;
int_t *termlen;
int_t extra_cells;
{
cell_t *buf, *bufptr;
int_t bufloc = 0;
int_t buflen;
cell_t *cp1, *cp2;
int_t i1, size, irt_index, size_with_trailer, loc;
int_t tag;
rref_t *rref;
#ifdef PCN_ALIGN_DOUBLES
int_t align_it;
#endif /* PCN_ALIGN_DOUBLES */
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout, "(%lu,%lu) copy_term: term=",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
_p_print_term(_p_stdout, term);
fprintf(_p_stdout, "\n");
}
#endif /* DEBUG */
/*
* Figure out how big a buffer to allocate, and copy over the first
* level of the term.
* If 'term' is a tuple, then allocate the default size.
* Otherwise, allocate just enough room for the term.
*/
Dereference((cell_t *), term, cp1);
switch(tag = DHCellTag(cp1))
{
case DOUBLE_TAG:
case INT_TAG:
case STRING_TAG:
*termlen = _p_size_with_trailer(tag, DHCellSize(cp1));
buf = _p_alloc_msg_buffer(*termlen + extra_cells);
memcpy(buf, cp1, (*termlen * CELL_SIZE));
return (buf);
break;
case UNDEF_TAG:
*termlen = RrefSizeWithTrailer();
buf = _p_alloc_msg_buffer(*termlen + extra_cells);
irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT);
CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id);
return (buf);
break;
case RREF_TAG:
*termlen = RrefSizeWithTrailer();
buf = _p_alloc_msg_buffer(*termlen + extra_cells);
if (((rref_t *)cp1)->weight > 1)
{
/* Split this remote reference */
i1 = (((rref_t *)cp1)->weight + 2) / 3; /* Make sure i1 >= 1 */
((rref_t *)cp1)->weight -= i1;
CTBuildRref(i1, ((rref_t *)cp1)->location, ((rref_t *)cp1)->node);
}
else /* ((rref_t *)cp1)->weight == 1 */
{
/* Chain the remote reference */
irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT);
CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id);
}
return (buf);
break;
case TUPLE_TAG:
size = DHCellSize(cp1);
if (size == 0) /* NilCell */
{
*termlen = EmptyListSizeWithTrailer();
buf = _p_alloc_msg_buffer(*termlen + extra_cells);
memcpy(buf, cp1, (*termlen * CELL_SIZE));
return (buf);
}
size_with_trailer = TupleSizeWithTrailer(size);
if (size > CT_STACK_SIZE)
{
/*
* The tuple is larger than our ct_stack (and each tuple
* argument requires one ct_stack entry). Therefore,
* we need to copy over just the tuple (no data) and
* avoid using the ct_stack so as not to blow off the
* top of it.
*/
*termlen = size_with_trailer + (size * RrefSizeWithTrailer());
buf = _p_alloc_msg_buffer(*termlen + extra_cells);
buf[bufloc++] = *cp1++; /* Copy over header word */
loc = bufloc;
bufloc += size_with_trailer - 1;
for (i1 = 0; i1 < size; i1++)
{
/* Setup offset for tuple argument to rref */
buf[loc] = (bufloc - loc) * CHARS_PER_CELL;
loc++;
/* Create the rref */
Dereference((cell_t *), *cp1, cp2);
irt_index = _p_alloc_irt_entry(cp2, INIT_WEIGHT);
CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id);
cp1++;
}
/* Copy over the optional pad and trailer cell */
memcpy(&(buf[loc]), cp1,
((size_with_trailer - size - 1) * CELL_SIZE));
return (buf);
}
/*
* Else:
* - allocate MAX(DEFAULT_SIZE, <minimum required size>)
* - copy first level of the tuple
* - setup ct_stack with the tuple's arguments
*/
i1 = size_with_trailer + (size * RrefSizeWithTrailer()) + extra_cells;
buflen = MAX(i1, _p_default_msg_buffer_size);
buf = _p_alloc_msg_buffer(buflen);
buflen -= extra_cells;
buf[bufloc++] = *cp1++; /* Copy over the header word */
ct_stack_size = 0;
for (i1 = size - 1; i1 >= 0; i1--)
{
ct_stack_cp[ct_stack_size] = (cell_t *) *(cp1 + i1);
ct_stack_loc[ct_stack_size++] = bufloc + i1;
}
bufloc += size;
/* Copy over the optional pad and trailer cell */
i1 = size_with_trailer - size - 1;
memcpy(&(buf[bufloc]), (cp1 + size), (i1 * CELL_SIZE));
bufloc += i1;
break;
#ifdef STREAMS
case STREAM_TAG:
_p_fatal_error("copy_term: Cannot send stream between nodes");
break;
#endif /* STREAMS */
default:
_p_fatal_error("copy_term: Bad tag found");
break;
}
/*
* If we get to here then we're dealing with a tuple. The
* first level * of the tuple will already have been copied over,
* with the arguments pushed onto the ct_stack.
*
* At this point, we are guaranteed that there is enough
* room in the buffer to hold rrefs for each thing on the ct_stack.
*/
while (ct_stack_size > 0)
{
Dereference((cell_t *), ct_stack_cp[--ct_stack_size], cp1);
/*
* Backfill the location in buf for this ct_stack entry with
* an offset to the current location in buf
*/
loc = ct_stack_loc[ct_stack_size];
buf[loc] = (bufloc - loc) * CHARS_PER_CELL;
switch (tag = DHCellTag(cp1))
{
case TUPLE_TAG:
size = DHCellSize(cp1);
if (size == 0) /* NilCell */
{
/* Its assumed that an empty list takes <= cells than a rref */
i1 = EmptyListSizeWithTrailer();
memcpy(&(buf[bufloc]), cp1, (i1 * CELL_SIZE));
bufloc += i1;
break;
}
size_with_trailer = TupleSizeWithTrailer(size);
/*
* We either have to copy the tuple itself into the message
* buffer, or put in a remote reference (rref) to the tuple.
*
* We copy just a rref if:
* 1) there is not enough room for the entire tuple
* in the message buffer, or
* 2) there is not enough space on the ct_stack to
* handle the arguments of this tuple, or
*/
if ((CTNoSpace(size_with_trailer + (RrefSizeWithTrailer() * size)))
|| ((CT_STACK_SIZE - ct_stack_size) < size) )
{
/* We only have room for the rref of this */
irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT);
CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id);
}
else
{
/* We have room for each argument of the tuple */
buf[bufloc++] = *cp1++;
for (i1 = size - 1; i1 >= 0; i1--)
{
ct_stack_cp[ct_stack_size] = (cell_t *) *(cp1 + i1);
ct_stack_loc[ct_stack_size++] = bufloc + i1;
}
bufloc += size;
/* Copy over the optional pad and trailer cell */
i1 = size_with_trailer - size - 1;
memcpy(&(buf[bufloc]), (cp1 + size), (i1 * CELL_SIZE));
bufloc += i1;
}
break;
case DOUBLE_TAG:
case INT_TAG:
case STRING_TAG:
size = _p_size_with_trailer(tag, DHCellSize(cp1));
if (CTNoSpace(size))
{
/* We only have room for the rref of this */
irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT);
CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id);
}
else
{
/* We have room for the data */
bufptr = &buf[bufloc];
memcpy(bufptr, cp1, (size * CELL_SIZE));
bufloc += size;
}
break;
case UNDEF_TAG:
irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT);
CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id);
break;
case RREF_TAG:
if (((rref_t *)cp1)->weight > 1)
{
/* Split this remote reference */
i1 = (((rref_t *)cp1)->weight + 2) / 3; /* Make sure i1 >= 1 */
((rref_t *)cp1)->weight -= i1;
CTBuildRref(i1, ((rref_t *)cp1)->location,
((rref_t *)cp1)->node);
}
else /* ((rref_t *)cp1)->weight == 1 */
{
/* Chain the remote reference */
irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT);
CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id);
}
break;
#ifdef STREAMS
case STREAM_TAG:
_p_fatal_error("copy_term: Cannot send stream between nodes");
break;
#endif /* STREAMS */
default:
_p_fatal_error("Bad tag found during copy_term()");
break;
}
}
*termlen = bufloc;
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout, "(%lu,%lu) copy_term: buf=:",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
for (i1 = 0; i1 < *termlen; i1++)
fprintf(_p_stdout, "%lx:", (unsigned long) buf[i1]);
fprintf(_p_stdout, "\n");
}
#endif /* DEBUG */
return (buf);
} /* copy_term() */
/*
* scan_term()
*
* Scan a term to convert relative offsets to pointers. All offsets are
* in bytes.
*/
static void scan_term(term, termlen)
cell_t *term;
int termlen;
{
cell_t *end;
cell_t *cp1;
int_t tag;
int_t i, size, size_with_trailer;
u_int_t weight, location;
irt_t *irt_entry;
#ifdef DEBUG
cell_t *save_term = term;
if (ParDebug(8))
{
int i1;
fprintf(_p_stdout, "(%lu,%lu) scan_term: buf=:",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
for (i1 = 0; i1 < termlen; i1++)
fprintf(_p_stdout, "%lx:", (unsigned long) term[i1]);
fprintf(_p_stdout, "\n");
fflush(_p_stdout);
}
#endif /* DEBUG */
for (end = term + termlen; term < end; )
{
if (IsRef(term))
{
if (*term != 0)
{
*((cell_t **) term)
= (cell_t *) ( ((char_t *) term) + *((int_t *) term) );
term++;
}
else
{
_p_fatal_error("scan_term(): Encountered a 0 term");
}
}
else
{
switch(tag = DHCellTag(term))
{
case INT_TAG:
case DOUBLE_TAG:
case STRING_TAG:
term += _p_size_with_trailer(tag, DHCellSize(term));
break;
case TRAILER_TAG:
term++;
break;
case TUPLE_TAG:
size = DHCellSize(term);
if (size == 0)
{
term += EmptyListSizeWithTrailer();
}
else
{
size_with_trailer = TupleSizeWithTrailer(size);
for (i = size, term++; i > 0; i--, term++)
{
*((cell_t **) term)
= (cell_t *) ( ((char_t *) term)
+ *((int_t *) term) );
}
term += (size_with_trailer - size - 1);
}
break;
case RREF_TAG:
if (_p_my_id == ((rref_t *) term)->node)
{
/* If this rref points to my node, then get rid of it */
weight = ((rref_t *) term)->weight;
location = ((rref_t *) term)->location;
irt_entry = IrtAddress(location);
Dereference((cell_t *), irt_entry->u.ptr, cp1);
*((cell_t **) term) = cp1;
_p_cancel_irt_entry(location, weight);
ZeroOutMemory((term + 1),
((RrefSizeWithTrailer() - 1) * CELL_SIZE));
}
term += RrefSizeWithTrailer();
break;
default:
_p_fatal_error("scan_term(): Bad tag found");
break;
}
}
}
#ifdef DEBUG
if (ParDebug(8))
{
fprintf(_p_stdout, "(%lu,%lu) scan_term: term=",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
_p_print_term(_p_stdout, save_term);
fprintf(_p_stdout, "\n");
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* scan_term() */
/*
* _p_send_define()
*
* Send a MSG_DEFINE message for the passed 'rref' containing 'term'.
*
* A define message is sent when a term is defined to a remote reference.
*/
void _p_send_define(rref, term)
rref_t *rref;
cell_t *term;
{
cell_t *msg_buf;
int_t term_len;
int_t msg_length;
msg_buf = copy_term(term, &term_len, 3);
*((u_int_t *) (msg_buf + term_len)) = rref->location;
*((u_int_t *) (msg_buf + term_len + 1)) = rref->weight;
*((u_int_t *) (msg_buf + term_len + 2)) = rref->value_return_irt;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_send_define(): to n %lu, l %lu, w %lu, r %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) rref->node, (unsigned long) rref->location,
(unsigned long) rref->weight,
(unsigned long) rref->value_return_irt);
fflush(_p_stdout);
}
#endif /* DEBUG */
msg_length = term_len + 3;
#ifdef GAUGE
if (msg_length > SMALL_MSG_THRESHOLD)
{
_p_gauge_stats.sbigmsgs++;
_p_gauge_stats.sbigmsgslen += msg_length;
}
else
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
_p_msg_send(msg_buf, rref->node, msg_length, MSG_DEFINE);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) _p_send_define(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* _p_send_define() */
/*
* process_define()
*
* Process the MSG_DEFINE message that has been received onto the heap.
* This entails looking up the irt entry for this data, and
* defining the resulting undef or rref with the value.
*/
static void process_define(size, node)
int size;
int node;
{
u_int_t location, weight, value_return_irt;
irt_t *irt_entry;
cell_t *cp1;
cell_t *term = _p_heap_ptr;
#ifdef GAUGE
if (size > SMALL_MSG_THRESHOLD)
{
_p_gauge_stats.rbigmsgs++;
_p_gauge_stats.rbigmsgslen += size;
}
else
_p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
_p_heap_ptr += size - 3;
location = *((u_int_t *) (_p_heap_ptr));
weight = *((u_int_t *) (_p_heap_ptr + 1));
value_return_irt = *((u_int_t *) (_p_heap_ptr + 2));
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) process_define(): from n %lu, l %lu, w %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node, (unsigned long) location,
(unsigned long) weight);
fflush(_p_stdout);
}
#endif /* DEBUG */
irt_entry = IrtAddress(location);
Dereference((cell_t *), irt_entry->u.ptr, cp1);
_p_cancel_irt_entry(location, weight);
scan_term(term, size - 3);
#ifdef DEBUG
if (ParDebug(6))
{
fprintf(_p_stdout, "(%lu,%lu) process_define(): ",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
_p_print_term(_p_stdout, term);
fprintf(_p_stdout, "\n");
fflush(_p_stdout);
}
#endif /* DEBUG */
if (!_p_define(cp1, term, TRUE, TRUE, value_return_irt, node))
{
fprintf(_p_stdout,
"(%lu,%lu) Warning: Node %lu: Received bad define message -- definition is already defined\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) _p_my_id);
}
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) process_define(): Processed\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* process_define() */
/*
* _p_send_value()
*
* Send a MSG_VALUE message. This is the return message from a
* previously received MSG_READ message.
*/
void _p_send_value(node, location, term)
int_t node;
int_t location;
cell_t *term;
{
cell_t *msg_buf;
int_t term_len;
int_t msg_length;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "(%lu,%lu) _p_send_value(): to n %lu, l %lu\t",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node, (unsigned long) location);
_p_print_term(_p_stdout, term);
fprintf(_p_stdout, "\n");
fflush(_p_stdout);
}
#endif /* DEBUG */
if (node == _p_my_id)
{
/*
* When (node == _p_my_id) we need to find unknown value on the
* other end of this remote reference and define it ourself,
* instead of sending off a message to ourself.
*
* This case is needed when you get a remote reference chain
* that ends up back at the same node. For example, suppose
* you have two nodes where node 0 has a rref (R0) to
* an undef (U1) on node 1. And also, node 1 has a rref (R1)
* to an undef (U0) on node 0. Then suppose U1 is defined to be
* the value of R1 (U1 = R1) on node 1. This will cause a chain
* where R0 is a rref to R1 which is a rref to U0.
* Now, suppose a process on node 0 tried to read from R0. This
* will generate a MSG_READ to node 1, which will forward that
* back to node 0 for U0. Then U0 will get a value note for the
* R0 request. When U0 gets a value, it will generate a
* send_value to fulfill R0's MSG_READ request. But this
* send_value will be to the same node. Thus, we need this case.
*/
cell_t *cp1;
irt_t *irt_entry = IrtAddress(location);
Dereference((cell_t *), irt_entry->u.ptr, cp1);
_p_cancel_irt_entry(location, 1);
_p_define(cp1, term, TRUE, FALSE, 0, 0);
}
else
{
msg_buf = copy_term(term, &term_len, 1);
*((u_int_t *) (msg_buf + term_len)) = location;
msg_length = term_len + 1;
#ifdef GAUGE
if (msg_length > SMALL_MSG_THRESHOLD)
{
_p_gauge_stats.sbigmsgs++;
_p_gauge_stats.sbigmsgslen += msg_length;
}
else
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
_p_msg_send(msg_buf, node, msg_length, MSG_VALUE);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) _p_send_value(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
}
} /* _p_send_value() */
/*
* process_value()
*
* Process the MSG_VALUE message that has been received onto the heap.
* This entails looking up the irt entry for this data, and
* defining the resulting undef or rref with the value.
*/
static void process_value(size, node)
int size;
int node;
{
int_t location;
irt_t *irt_entry;
cell_t *cp1;
cell_t *term = _p_heap_ptr;
#ifdef GAUGE
if (size > SMALL_MSG_THRESHOLD)
{
_p_gauge_stats.rbigmsgs++;
_p_gauge_stats.rbigmsgslen += size;
}
else
_p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
_p_heap_ptr += size - 1;
location = *((int_t *) (_p_heap_ptr));
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "(%lu,%lu) process_value(): from n %lu, l %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node, (unsigned long) location);
fflush(_p_stdout);
}
#endif /* DEBUG */
irt_entry = IrtAddress(location);
Dereference((cell_t *), irt_entry->u.ptr, cp1);
_p_cancel_irt_entry(location, 1);
scan_term(term, size - 2);
#ifdef DEBUG
if (ParDebug(6))
{
fprintf(_p_stdout, "(%lu,%lu) process_value(): ",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
_p_print_term(_p_stdout, term);
fprintf(_p_stdout, "\n");
fflush(_p_stdout);
}
#endif /* DEBUG */
/*
* Do not generate a MSG_DEFINE message when overwriting this rref.
* That is because the value was supplied to us by the node that
* contains the other end of this rref. Since that other node
* has given us this value, there is no need for us to bounce
* it right back at that other node.
*/
if (!_p_define(cp1, term, FALSE, FALSE, 0, 0))
{
fprintf(_p_stdout,
"(%lu,%lu) Warning: Node %lu: Received bad value message -- definition is already defined\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) _p_my_id);
}
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) process_value(): Processed\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* process_value() */
/*
* _p_send_read()
*
* Send a MSG_READ message for the given 'rref'.
*/
void _p_send_read(rref)
rref_t *rref;
{
cell_t *msg_buf;
int_t value_return_irt;
value_return_irt = _p_alloc_irt_entry(rref, 1);
rref->value_return_irt = value_return_irt;
msg_buf = _p_alloc_msg_buffer(3);
/* msg_buf = {his_irt_entry, my_node_id, my_irt_entry} */
*((u_int_t *)(msg_buf)) = rref->location;
*((u_int_t *)(msg_buf + 1)) = _p_my_id;
*((u_int_t *)(msg_buf + 2)) = value_return_irt;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_send_read(): to n %lu, l %lu: return to l %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) rref->node, (unsigned long) rref->location,
(unsigned long) *((u_int_t *)(msg_buf + 2)));
fflush(_p_stdout);
}
#endif /* DEBUG */
#ifdef GAUGE
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
_p_msg_send(msg_buf, rref->node, 3, MSG_READ);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) _p_send_read(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* _p_send_read() */
/*
* process_read()
*
* Process the MSG_READ message that has been received onto the heap.
* This entails looking up the irt entry for this data. If the
* resulting data is an undef or rref, then queue up a value note.
* If it is real data, then generate a MSG_VALUE message.
*
* GC_ALERT: This procedure may induce garbage collection
*/
static void process_read(size, node)
int size;
int node;
{
cell_t *term;
cell_t *msg_buf;
value_note_t *value_note;
u_int_t my_location, return_node, return_location;
irt_t *irt_entry;
#ifdef GAUGE
_p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
/* _p_heap_ptr = {my_location, return_node_id, return_location} */
my_location = *((u_int_t *)(_p_heap_ptr));
return_node = *((u_int_t *)(_p_heap_ptr + 1));
return_location = *((u_int_t *)(_p_heap_ptr + 2));
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) process_read(): for l %lu: return n %lu, l %lu:\t",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) my_location, (unsigned long) return_node,
(unsigned long) return_location);
fflush(_p_stdout);
}
#endif /* DEBUG */
irt_entry = IrtAddress(my_location);
Dereference((cell_t *), irt_entry->u.ptr, term);
switch (DHCellTag(term))
{
case UNDEF_TAG:
/* Drop a value note onto the suspension queue */
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "enqueueing value note\n");
fflush(_p_stdout);
}
#endif /* DEBUG */
/* GC_ALERT */
PushGCReference(&term);
value_note = _p_alloc_value_note(return_location, return_node);
PopGCReference(1);
EnqueueSuspension(((proc_record_t *) value_note), term);
break;
case RREF_TAG:
/* Forward the read message to the appropriate node */
msg_buf = _p_alloc_msg_buffer(3);
*((u_int_t *)(msg_buf)) = ((rref_t *) term)->location;
*((u_int_t *)(msg_buf + 1)) = return_node;
*((u_int_t *)(msg_buf + 2)) = return_location;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "forwarding to node %lu\n",
(unsigned long) ((rref_t *) term)->node);
fflush(_p_stdout);
}
#endif /* DEBUG */
#ifdef GAUGE
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
_p_msg_send(msg_buf, ((rref_t *) term)->node, 3, MSG_READ);
break;
default:
/* Answer the read message with a value message */
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "returning with value message\n");
fflush(_p_stdout);
}
#endif /* DEBUG */
_p_send_value(return_node,
return_location,
term);
break;
}
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) process_read(): Processed\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* process_read() */
/*
* _p_send_cancels()
*
* Send a MSG_CANCEL message for all remote references that need
* to be canceled.
*
* _p_cancel_lists is an array with _p_nodes elements, where each
* element is a pointer to a list of data structure, where each
* data structure is either a rref or a value_note.
* For each node (i.e., each list starting at a particular array element),
* send cancel messages for the items on that list.
*
* In rrefs, the 'node' field is used to hold the linked list pointer.
* In value_notes, the 'next' field is used to hold the linked list pointer.
*/
void _p_send_cancels()
{
cell_t **cancel_list_entry;
int_t node, i, j;
int_t msg_length;
cell_t *this_ds, *next_ds;
cell_t *msg_buf, *mb, *end;
for (node = 0, cancel_list_entry = _p_cancel_lists;
node < _p_nodes && _p_cancels > 0;
node++, cancel_list_entry++)
{
if (*cancel_list_entry != (cell_t *) NULL)
{
this_ds = *cancel_list_entry;
while (this_ds != (cell_t *) NULL)
{
mb = msg_buf = _p_alloc_msg_buffer(CANCEL_SIZE);
for (i = 0, mb++, end = msg_buf + CANCEL_SIZE - 1;
(mb < end) && (this_ds != (cell_t *) NULL);
i++)
{
if (*this_ds == 0)
{
/*
* This is a rref that was defined a value
* during emulation.
*
* Note: This relies on the specific layout of rref_t.
*/
*((int_t *) mb++) = (int_t) *((u_int_t *)(this_ds+1));
*((int_t *) mb++) = (int_t) *((u_int_t *)(this_ds+2));
next_ds = *((cell_t **)(this_ds+3));
for (j = RrefSizeWithTrailer() - 2; j > 0; j--)
*(++this_ds) = (cell_t) 0;
}
else if (IsRref(this_ds))
{
/*
* This is an orphaned rref that was found
* during garbage collection.
*/
*((int_t *) mb++) = ((rref_t *)this_ds)->location;
*((int_t *) mb++) = ((rref_t *)this_ds)->weight;
next_ds = (cell_t *) ((rref_t *)this_ds)->node;
for (j = RrefSizeWithTrailer(); j > 0; j--)
*this_ds++ = (cell_t) 0;
}
else /* IsValueNote(this_ds) */
{
/*
* This is either:
* - an orphaned value_note that was found
* during garbage collection
* - a value_note that was discarded by
* utils.c:process_susp_queue()
*/
*((int_t *) mb++) =((value_note_t *)this_ds)->location;
*((int_t *) mb++) = 1;
next_ds = (cell_t *) ((value_note_t *)this_ds)->next;
_p_free_value_note((value_note_t *) this_ds);
}
this_ds = next_ds;
_p_cancels--;
}
*((int_t *) msg_buf) = i;
msg_length = mb - msg_buf;
#ifdef GAUGE
if (msg_length > SMALL_MSG_THRESHOLD)
{
_p_gauge_stats.sbigmsgs++;
_p_gauge_stats.sbigmsgslen += msg_length;
}
else
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_send_cancels(): Sending to node %lu\n",
(unsigned long) _p_my_id,
(unsigned long) _p_reduction,
(unsigned long) node);
}
#endif /* DEBUG */
_p_msg_send(msg_buf, node, msg_length, MSG_CANCEL);
}
*cancel_list_entry = (cell_t *) NULL;
}
}
#ifdef DEBUG
/* Consistency check */
if (_p_cancels != 0)
{
_p_fatal_error("_p_send_cancels(): Failed consistency check. Lost some cancels somewhere...");
}
#endif /* DEBUG */
} /* _p_send_cancels() */
/*
* process_cancel()
*
* Process the MSG_CANCEL message that has been received onto the heap.
* This entails looking up the irt entry for this data, and cancelling
* the weight from that entry.
*/
static void process_cancel(size, node)
int size;
int node;
{
cell_t *cancels = _p_heap_ptr;
int_t n_cancels;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "(%lu,%lu) process_cancel(): From node %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node);
}
#endif /* DEBUG */
#ifdef GAUGE
if (size > SMALL_MSG_THRESHOLD)
{
_p_gauge_stats.rbigmsgs++;
_p_gauge_stats.rbigmsgslen += size;
}
else
_p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
for (n_cancels = *((u_int_t *) cancels++);
n_cancels > 0;
n_cancels--, cancels += 2)
{
_p_cancel_irt_entry(*((u_int_t *) cancels),
*((u_int_t *) (cancels + 1)));
}
} /* process_cancel() */
/*
* _p_send_collect()
*
* Send a MSG_COLLECT message to all "appropriate" nodes. This is
* called when a garbage collection on a node of a multiprocessor
* run has failed to collect enough space.
*/
void _p_send_collect()
{
int i;
for (i = _p_nodes - 1; i >= 0; i--)
{
if (i != _p_my_id)
{
#ifdef GAUGE
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
_p_msg_send((cell_t *) NULL, i, 0, MSG_COLLECT);
}
}
} /* _p_send_collect() */
/*
* process_collect()
*
* Process the MSG_COLLECT message that has been received onto the heap.
* This entails calling the garbage collector, with the hope of generating
* some MSG_CANCEL messages to free up space on other nodes.
*/
static void process_collect(size, node)
int size;
int node;
{
#ifdef GAUGE
_p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
DoGC();
} /* process_collect() */
/*
* _p_send_exit()
*
* Send a MSG_EXIT message to the given 'node' with the given 'exit_code'
*/
void _p_send_exit(node, msg_type, exit_code)
int_t node;
int_t msg_type;
int_t exit_code;
{
cell_t *msg_buf;
msg_buf = _p_alloc_msg_buffer(1);
*((int_t *)(msg_buf)) = exit_code;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) _p_send_exit(): to node %lu, exit_code %lu, msg_type %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node, (unsigned long) exit_code,
(unsigned long) msg_type);
fflush(_p_stdout);
}
#endif /* DEBUG */
#ifdef GAUGE
_p_gauge_stats.ssmallmsgs++;
#endif /* GAUGE */
_p_msg_send(msg_buf, node, 1, msg_type);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) _p_send_exit(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* _p_send_exit() */
static void process_exit(size, node)
int size;
int node;
{
int_t exit_code;
#ifdef GAUGE
_p_gauge_stats.rsmallmsgs++;
#endif /* GAUGE */
exit_code = *((int_t *)(_p_heap_ptr));
if (_p_host)
_p_host_handle_exit(exit_code);
else
_p_node_handle_exit(exit_code, FALSE);
} /* process_exit() */
/*
* _p_process_messages()
*
* Process any waiting messages from other nodes.
*
* Valid 'rcv_type' arguments are:
* RCV_NOBLOCK Do not block if no messges are waiting
* RCV_BLOCK Block until at least one MSG_DEFINE or MSG_VALUE
* message has been processed.
* RCV_COLLECT When called from _p_garbage_collect(). Ignore
* MSG_COLLECT messages, and queue up
* MSG_DEFINE and MSG_VALUE. Block until
* we get a MSG_CANCEL.
*/
void _p_process_messages(rcv_type)
int rcv_type;
{
char error_msg[1024];
int_t type, size, node;
bool_t done = FALSE;
while (!done)
{
if (!_p_msg_receive(&node, &size, &type, rcv_type))
done = TRUE;
else
{
switch(type)
{
case MSG_DEFINE:
process_define(size, node);
rcv_type = RCV_NOBLOCK;
break;
case MSG_VALUE:
process_value(size, node);
rcv_type = RCV_NOBLOCK;
break;
case MSG_READ:
process_read(size, node);
break;
case MSG_CANCEL:
process_cancel(size, node);
done = TRUE;
break;
case MSG_COLLECT:
process_collect(size, node);
break;
#ifdef GAUGE
case MSG_GAUGE:
_p_process_gauge_msg(size, node);
break;
#endif /* GAUGE */
case MSG_INITIATE_EXIT:
if (_p_host)
process_exit(size, node);
else
_p_fatal_error("Node received unexpected MSG_INITIATE_EXIT message");
done = TRUE;
break;
case MSG_EXIT:
if (!_p_host)
process_exit(size, node);
else
_p_fatal_error("Host received unexpected MSG_EXIT message");
done = TRUE;
break;
default:
sprintf(error_msg, "Invalid message received on node %lu: type=%lu, size=%lu, from node=%lu",
(unsigned long) _p_my_id, (unsigned long) type,
(unsigned long) size, (unsigned long) node);
_p_fatal_error(error_msg);
break;
}
}
}
} /* _p_process_messages() */
/*
* _p_host_handle_exit()
*/
void _p_host_handle_exit(exit_code)
int_t exit_code;
{
int type, size, node;
int_t i;
int_t exit_count;
/*
* (Reset _p_heap_ptr so that we can't possibly receive a
* message that goes off the top of the heap.)
*/
_p_heap_ptr = _p_heap_bottom;
/* Send a MSG_EXIT to each node */
for (i = 1; i < _p_nodes; i++)
_p_send_exit(i, MSG_EXIT, exit_code);
/*
* Now wait for each node to return a MSG_EXIT message, and discard
* any other messages that come in in the mean time.
* (Reset _p_heap_ptr so that we can't possibly receive a
* message that goes off the top of the heap.)
*/
for (exit_count = 1; exit_count < _p_nodes; )
{
_p_msg_receive(&node, &size, &type, RCV_BLOCK);
if (type == MSG_EXIT)
exit_count++;
}
#ifdef GAUGE
/* Take a final profile snapshot, and coalesce the node profiles */
_p_host_final_profile();
#endif /* GAUGE */
#ifdef UPSHOT
/* Dump the upshot log */
_p_write_upshot_log();
#endif
#if defined(GAUGE) || defined(UPSHOT)
/* Do another sync up with the nodes */
for (i = 1; i < _p_nodes; i++)
_p_send_exit(i, MSG_EXIT, exit_code);
for (exit_count = 1; exit_count < _p_nodes; )
{
_p_msg_receive(&node, &size, &type, RCV_BLOCK);
if (type == MSG_EXIT)
exit_count++;
}
#endif /* GAUGE || UPSHOT */
/*
* If the send/receive module requires some special method of
* shutting down all of the nodes, then do it here.
*
* At this point we know that all of the nodes are at least
* into the _p_destroy_nodes() procedure, so _p_destroy_nodes()
* can forcably kill all the nodes if it wishes.
*/
_p_destroy_nodes();
/* Send a MSG_EXIT to each node */
for (i = 1; i < _p_nodes; i++)
_p_send_exit(i, MSG_EXIT, exit_code);
_p_shutdown_pcn();
exit((int) exit_code);
} /* _p_host_handle_exit() */
/*
* _p_node_handle_exit()
*/
void _p_node_handle_exit(exit_code, initiate_it)
int_t exit_code;
bool_t initiate_it;
{
int type, size, node;
bool_t done;
/*
* (Reset _p_heap_ptr so that we can't possibly receive a
* message that goes off the top of the heap.)
*/
_p_heap_ptr = _p_heap_bottom;
if (initiate_it)
{
/*
* This node is initiating the exit. So send a MSG_INITIATE_EXIT
* message to the host, and then wait for the normal MSG_EXIT
* message that the host will broadcast.
*/
_p_send_exit(_p_host_id, MSG_INITIATE_EXIT, exit_code);
for (done = FALSE; !done; )
{
_p_msg_receive(&node, &size, &type, RCV_BLOCK);
if (type == MSG_EXIT)
{
exit_code = *((int_t *)(_p_heap_ptr));
done = TRUE;
}
}
}
/* Send a MSG_EXIT message to the host */
_p_send_exit(_p_host_id, MSG_EXIT, exit_code);
#ifdef GAUGE
/* Take a final profile snapshot, and coalesce the node profiles */
_p_node_final_profile();
#endif /* GAUGE */
#ifdef UPSHOT
/* Dump the upshot log */
_p_write_upshot_log();
#endif /* UPSHOT */
#if defined(GAUGE) || defined(UPSHOT)
/* Do another sync up with the host */
for (done = FALSE; !done; )
{
_p_msg_receive(&node, &size, &type, RCV_BLOCK);
if (type == MSG_EXIT)
done = TRUE;
}
_p_send_exit(_p_host_id, MSG_EXIT, exit_code);
#endif /* GAUGE || UPSHOT */
/*
* If the send/receive module requires some special method of
* shutting down all of the nodes, then do it here.
*
* However, a node should not do something that will cause
* the destruction of other nodes or the host, since the
* other nodes may not have completed previous operations yet.
*/
_p_destroy_nodes();
/*
* Now wait to either get killed or receive a final MSG_EXIT message.
*/
for (done = FALSE; !done; )
{
_p_msg_receive(&node, &size, &type, RCV_BLOCK);
if (type == MSG_EXIT)
done = TRUE;
}
_p_shutdown_pcn();
exit((int) exit_code);
} /* _p_node_handle_exit() */
#endif /* PARALLEL */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.