This is parallel.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * parallel.c - Code to support parallel PCN */ #include "pcn.h" #ifdef PARALLEL static cell_t *ct_stack_cp[CT_STACK_SIZE]; static int_t ct_stack_loc[CT_STACK_SIZE]; static int_t ct_stack_size = 0; /* These are used by copy_term() and scan_term() */ #define CTAddRREF_TAG(I) ((((int_t) (I)) << TAG_SIZE) | RREF_TAG) #define CTRemoveRREF_TAG(I) ((((int_t )(I)) & 0xFFFFFFF0) >> TAG_SIZE) #define CTBuildRref(Weight, Location, Node) \ { \ rref = (rref_t *) &buf[bufloc]; \ rref->tag = RREF_TAG; \ rref->mark = 0; \ rref->suspensions = 0; \ rref->node = (Node); \ rref->location = (Location); \ rref->weight = (Weight); \ rref->value_return_irt = RREF_NOT_READ_IRT; \ rref->trailer_tag = RREF_TRAILER_TAG; \ bufloc += RrefSizeWithTrailer(); \ } #define CTNoSpace(Needed) \ (bufloc + (RrefSizeWithTrailer() * ct_stack_size) + (Needed)) > buflen /* * copy_term() * * Allocate a message buffer and copy the passed term into it. 'extra_cells' * cells are left free at the end of the message buffer. 'termlen' * will be set to the length (in cells) of the term in the returned buffer. * (termlen does not include the extra cells) * * All references are translated into relative addresses (offset). The * offsets are in bytes, not Cells, so that offset can be detected by * scan_term() using IsRef(). * * If PCN_ALIGN_DOUBLES is defined, then all data structures are double * word aligned. */ static cell_t *copy_term(term, termlen, extra_cells) cell_t *term; int_t *termlen; int_t extra_cells; { cell_t *buf, *bufptr; int_t bufloc = 0; int_t buflen; cell_t *cp1, *cp2; int_t i1, size, irt_index, size_with_trailer, loc; int_t tag; rref_t *rref; #ifdef PCN_ALIGN_DOUBLES int_t align_it; #endif /* PCN_ALIGN_DOUBLES */ #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) copy_term: term=", (unsigned long) _p_my_id, (unsigned long) _p_reduction); _p_print_term(_p_stdout, term); fprintf(_p_stdout, "\n"); } #endif /* DEBUG */ /* * Figure out how big a buffer to allocate, and copy over the first * level of the term. * If 'term' is a tuple, then allocate the default size. * Otherwise, allocate just enough room for the term. */ Dereference((cell_t *), term, cp1); switch(tag = DHCellTag(cp1)) { case DOUBLE_TAG: case INT_TAG: case STRING_TAG: *termlen = _p_size_with_trailer(tag, DHCellSize(cp1)); buf = _p_alloc_msg_buffer(*termlen + extra_cells); memcpy(buf, cp1, (*termlen * CELL_SIZE)); return (buf); break; case UNDEF_TAG: *termlen = RrefSizeWithTrailer(); buf = _p_alloc_msg_buffer(*termlen + extra_cells); irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT); CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id); return (buf); break; case RREF_TAG: *termlen = RrefSizeWithTrailer(); buf = _p_alloc_msg_buffer(*termlen + extra_cells); if (((rref_t *)cp1)->weight > 1) { /* Split this remote reference */ i1 = (((rref_t *)cp1)->weight + 2) / 3; /* Make sure i1 >= 1 */ ((rref_t *)cp1)->weight -= i1; CTBuildRref(i1, ((rref_t *)cp1)->location, ((rref_t *)cp1)->node); } else /* ((rref_t *)cp1)->weight == 1 */ { /* Chain the remote reference */ irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT); CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id); } return (buf); break; case TUPLE_TAG: size = DHCellSize(cp1); if (size == 0) /* NilCell */ { *termlen = EmptyListSizeWithTrailer(); buf = _p_alloc_msg_buffer(*termlen + extra_cells); memcpy(buf, cp1, (*termlen * CELL_SIZE)); return (buf); } size_with_trailer = TupleSizeWithTrailer(size); if (size > CT_STACK_SIZE) { /* * The tuple is larger than our ct_stack (and each tuple * argument requires one ct_stack entry). Therefore, * we need to copy over just the tuple (no data) and * avoid using the ct_stack so as not to blow off the * top of it. */ *termlen = size_with_trailer + (size * RrefSizeWithTrailer()); buf = _p_alloc_msg_buffer(*termlen + extra_cells); buf[bufloc++] = *cp1++; /* Copy over header word */ loc = bufloc; bufloc += size_with_trailer - 1; for (i1 = 0; i1 < size; i1++) { /* Setup offset for tuple argument to rref */ buf[loc] = (bufloc - loc) * CHARS_PER_CELL; loc++; /* Create the rref */ Dereference((cell_t *), *cp1, cp2); irt_index = _p_alloc_irt_entry(cp2, INIT_WEIGHT); CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id); cp1++; } /* Copy over the optional pad and trailer cell */ memcpy(&(buf[loc]), cp1, ((size_with_trailer - size - 1) * CELL_SIZE)); return (buf); } /* * Else: * - allocate MAX(DEFAULT_SIZE, <minimum required size>) * - copy first level of the tuple * - setup ct_stack with the tuple's arguments */ i1 = size_with_trailer + (size * RrefSizeWithTrailer()) + extra_cells; buflen = MAX(i1, _p_default_msg_buffer_size); buf = _p_alloc_msg_buffer(buflen); buflen -= extra_cells; buf[bufloc++] = *cp1++; /* Copy over the header word */ ct_stack_size = 0; for (i1 = size - 1; i1 >= 0; i1--) { ct_stack_cp[ct_stack_size] = (cell_t *) *(cp1 + i1); ct_stack_loc[ct_stack_size++] = bufloc + i1; } bufloc += size; /* Copy over the optional pad and trailer cell */ i1 = size_with_trailer - size - 1; memcpy(&(buf[bufloc]), (cp1 + size), (i1 * CELL_SIZE)); bufloc += i1; break; #ifdef STREAMS case STREAM_TAG: _p_fatal_error("copy_term: Cannot send stream between nodes"); break; #endif /* STREAMS */ default: _p_fatal_error("copy_term: Bad tag found"); break; } /* * If we get to here then we're dealing with a tuple. The * first level * of the tuple will already have been copied over, * with the arguments pushed onto the ct_stack. * * At this point, we are guaranteed that there is enough * room in the buffer to hold rrefs for each thing on the ct_stack. */ while (ct_stack_size > 0) { Dereference((cell_t *), ct_stack_cp[--ct_stack_size], cp1); /* * Backfill the location in buf for this ct_stack entry with * an offset to the current location in buf */ loc = ct_stack_loc[ct_stack_size]; buf[loc] = (bufloc - loc) * CHARS_PER_CELL; switch (tag = DHCellTag(cp1)) { case TUPLE_TAG: size = DHCellSize(cp1); if (size == 0) /* NilCell */ { /* Its assumed that an empty list takes <= cells than a rref */ i1 = EmptyListSizeWithTrailer(); memcpy(&(buf[bufloc]), cp1, (i1 * CELL_SIZE)); bufloc += i1; break; } size_with_trailer = TupleSizeWithTrailer(size); /* * We either have to copy the tuple itself into the message * buffer, or put in a remote reference (rref) to the tuple. * * We copy just a rref if: * 1) there is not enough room for the entire tuple * in the message buffer, or * 2) there is not enough space on the ct_stack to * handle the arguments of this tuple, or */ if ((CTNoSpace(size_with_trailer + (RrefSizeWithTrailer() * size))) || ((CT_STACK_SIZE - ct_stack_size) < size) ) { /* We only have room for the rref of this */ irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT); CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id); } else { /* We have room for each argument of the tuple */ buf[bufloc++] = *cp1++; for (i1 = size - 1; i1 >= 0; i1--) { ct_stack_cp[ct_stack_size] = (cell_t *) *(cp1 + i1); ct_stack_loc[ct_stack_size++] = bufloc + i1; } bufloc += size; /* Copy over the optional pad and trailer cell */ i1 = size_with_trailer - size - 1; memcpy(&(buf[bufloc]), (cp1 + size), (i1 * CELL_SIZE)); bufloc += i1; } break; case DOUBLE_TAG: case INT_TAG: case STRING_TAG: size = _p_size_with_trailer(tag, DHCellSize(cp1)); if (CTNoSpace(size)) { /* We only have room for the rref of this */ irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT); CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id); } else { /* We have room for the data */ bufptr = &buf[bufloc]; memcpy(bufptr, cp1, (size * CELL_SIZE)); bufloc += size; } break; case UNDEF_TAG: irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT); CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id); break; case RREF_TAG: if (((rref_t *)cp1)->weight > 1) { /* Split this remote reference */ i1 = (((rref_t *)cp1)->weight + 2) / 3; /* Make sure i1 >= 1 */ ((rref_t *)cp1)->weight -= i1; CTBuildRref(i1, ((rref_t *)cp1)->location, ((rref_t *)cp1)->node); } else /* ((rref_t *)cp1)->weight == 1 */ { /* Chain the remote reference */ irt_index = _p_alloc_irt_entry(cp1, INIT_WEIGHT); CTBuildRref(INIT_WEIGHT, irt_index, _p_my_id); } break; #ifdef STREAMS case STREAM_TAG: _p_fatal_error("copy_term: Cannot send stream between nodes"); break; #endif /* STREAMS */ default: _p_fatal_error("Bad tag found during copy_term()"); break; } } *termlen = bufloc; #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) copy_term: buf=:", (unsigned long) _p_my_id, (unsigned long) _p_reduction); for (i1 = 0; i1 < *termlen; i1++) fprintf(_p_stdout, "%lx:", (unsigned long) buf[i1]); fprintf(_p_stdout, "\n"); } #endif /* DEBUG */ return (buf); } /* copy_term() */ /* * scan_term() * * Scan a term to convert relative offsets to pointers. All offsets are * in bytes. */ static void scan_term(term, termlen) cell_t *term; int termlen; { cell_t *end; cell_t *cp1; int_t tag; int_t i, size, size_with_trailer; u_int_t weight, location; irt_t *irt_entry; #ifdef DEBUG cell_t *save_term = term; if (ParDebug(8)) { int i1; fprintf(_p_stdout, "(%lu,%lu) scan_term: buf=:", (unsigned long) _p_my_id, (unsigned long) _p_reduction); for (i1 = 0; i1 < termlen; i1++) fprintf(_p_stdout, "%lx:", (unsigned long) term[i1]); fprintf(_p_stdout, "\n"); fflush(_p_stdout); } #endif /* DEBUG */ for (end = term + termlen; term < end; ) { if (IsRef(term)) { if (*term != 0) { *((cell_t **) term) = (cell_t *) ( ((char_t *) term) + *((int_t *) term) ); term++; } else { _p_fatal_error("scan_term(): Encountered a 0 term"); } } else { switch(tag = DHCellTag(term)) { case INT_TAG: case DOUBLE_TAG: case STRING_TAG: term += _p_size_with_trailer(tag, DHCellSize(term)); break; case TRAILER_TAG: term++; break; case TUPLE_TAG: size = DHCellSize(term); if (size == 0) { term += EmptyListSizeWithTrailer(); } else { size_with_trailer = TupleSizeWithTrailer(size); for (i = size, term++; i > 0; i--, term++) { *((cell_t **) term) = (cell_t *) ( ((char_t *) term) + *((int_t *) term) ); } term += (size_with_trailer - size - 1); } break; case RREF_TAG: if (_p_my_id == ((rref_t *) term)->node) { /* If this rref points to my node, then get rid of it */ weight = ((rref_t *) term)->weight; location = ((rref_t *) term)->location; irt_entry = IrtAddress(location); Dereference((cell_t *), irt_entry->u.ptr, cp1); *((cell_t **) term) = cp1; _p_cancel_irt_entry(location, weight); ZeroOutMemory((term + 1), ((RrefSizeWithTrailer() - 1) * CELL_SIZE)); } term += RrefSizeWithTrailer(); break; default: _p_fatal_error("scan_term(): Bad tag found"); break; } } } #ifdef DEBUG if (ParDebug(8)) { fprintf(_p_stdout, "(%lu,%lu) scan_term: term=", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); _p_print_term(_p_stdout, save_term); fprintf(_p_stdout, "\n"); fflush(_p_stdout); } #endif /* DEBUG */ } /* scan_term() */ /* * _p_send_define() * * Send a MSG_DEFINE message for the passed 'rref' containing 'term'. * * A define message is sent when a term is defined to a remote reference. */ void _p_send_define(rref, term) rref_t *rref; cell_t *term; { cell_t *msg_buf; int_t term_len; int_t msg_length; msg_buf = copy_term(term, &term_len, 3); *((u_int_t *) (msg_buf + term_len)) = rref->location; *((u_int_t *) (msg_buf + term_len + 1)) = rref->weight; *((u_int_t *) (msg_buf + term_len + 2)) = rref->value_return_irt; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_define(): to n %lu, l %lu, w %lu, r %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) rref->node, (unsigned long) rref->location, (unsigned long) rref->weight, (unsigned long) rref->value_return_irt); fflush(_p_stdout); } #endif /* DEBUG */ msg_length = term_len + 3; #ifdef GAUGE if (msg_length > SMALL_MSG_THRESHOLD) { _p_gauge_stats.sbigmsgs++; _p_gauge_stats.sbigmsgslen += msg_length; } else _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ _p_msg_send(msg_buf, rref->node, msg_length, MSG_DEFINE); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_define(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* _p_send_define() */ /* * process_define() * * Process the MSG_DEFINE message that has been received onto the heap. * This entails looking up the irt entry for this data, and * defining the resulting undef or rref with the value. */ static void process_define(size, node) int size; int node; { u_int_t location, weight, value_return_irt; irt_t *irt_entry; cell_t *cp1; cell_t *term = _p_heap_ptr; #ifdef GAUGE if (size > SMALL_MSG_THRESHOLD) { _p_gauge_stats.rbigmsgs++; _p_gauge_stats.rbigmsgslen += size; } else _p_gauge_stats.rsmallmsgs++; #endif /* GAUGE */ _p_heap_ptr += size - 3; location = *((u_int_t *) (_p_heap_ptr)); weight = *((u_int_t *) (_p_heap_ptr + 1)); value_return_irt = *((u_int_t *) (_p_heap_ptr + 2)); #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) process_define(): from n %lu, l %lu, w %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node, (unsigned long) location, (unsigned long) weight); fflush(_p_stdout); } #endif /* DEBUG */ irt_entry = IrtAddress(location); Dereference((cell_t *), irt_entry->u.ptr, cp1); _p_cancel_irt_entry(location, weight); scan_term(term, size - 3); #ifdef DEBUG if (ParDebug(6)) { fprintf(_p_stdout, "(%lu,%lu) process_define(): ", (unsigned long) _p_my_id, (unsigned long) _p_reduction); _p_print_term(_p_stdout, term); fprintf(_p_stdout, "\n"); fflush(_p_stdout); } #endif /* DEBUG */ if (!_p_define(cp1, term, TRUE, TRUE, value_return_irt, node)) { fprintf(_p_stdout, "(%lu,%lu) Warning: Node %lu: Received bad define message -- definition is already defined\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) _p_my_id); } #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) process_define(): Processed\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* process_define() */ /* * _p_send_value() * * Send a MSG_VALUE message. This is the return message from a * previously received MSG_READ message. */ void _p_send_value(node, location, term) int_t node; int_t location; cell_t *term; { cell_t *msg_buf; int_t term_len; int_t msg_length; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_value(): to n %lu, l %lu\t", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node, (unsigned long) location); _p_print_term(_p_stdout, term); fprintf(_p_stdout, "\n"); fflush(_p_stdout); } #endif /* DEBUG */ if (node == _p_my_id) { /* * When (node == _p_my_id) we need to find unknown value on the * other end of this remote reference and define it ourself, * instead of sending off a message to ourself. * * This case is needed when you get a remote reference chain * that ends up back at the same node. For example, suppose * you have two nodes where node 0 has a rref (R0) to * an undef (U1) on node 1. And also, node 1 has a rref (R1) * to an undef (U0) on node 0. Then suppose U1 is defined to be * the value of R1 (U1 = R1) on node 1. This will cause a chain * where R0 is a rref to R1 which is a rref to U0. * Now, suppose a process on node 0 tried to read from R0. This * will generate a MSG_READ to node 1, which will forward that * back to node 0 for U0. Then U0 will get a value note for the * R0 request. When U0 gets a value, it will generate a * send_value to fulfill R0's MSG_READ request. But this * send_value will be to the same node. Thus, we need this case. */ cell_t *cp1; irt_t *irt_entry = IrtAddress(location); Dereference((cell_t *), irt_entry->u.ptr, cp1); _p_cancel_irt_entry(location, 1); _p_define(cp1, term, TRUE, FALSE, 0, 0); } else { msg_buf = copy_term(term, &term_len, 1); *((u_int_t *) (msg_buf + term_len)) = location; msg_length = term_len + 1; #ifdef GAUGE if (msg_length > SMALL_MSG_THRESHOLD) { _p_gauge_stats.sbigmsgs++; _p_gauge_stats.sbigmsgslen += msg_length; } else _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ _p_msg_send(msg_buf, node, msg_length, MSG_VALUE); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_value(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } } /* _p_send_value() */ /* * process_value() * * Process the MSG_VALUE message that has been received onto the heap. * This entails looking up the irt entry for this data, and * defining the resulting undef or rref with the value. */ static void process_value(size, node) int size; int node; { int_t location; irt_t *irt_entry; cell_t *cp1; cell_t *term = _p_heap_ptr; #ifdef GAUGE if (size > SMALL_MSG_THRESHOLD) { _p_gauge_stats.rbigmsgs++; _p_gauge_stats.rbigmsgslen += size; } else _p_gauge_stats.rsmallmsgs++; #endif /* GAUGE */ _p_heap_ptr += size - 1; location = *((int_t *) (_p_heap_ptr)); #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) process_value(): from n %lu, l %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node, (unsigned long) location); fflush(_p_stdout); } #endif /* DEBUG */ irt_entry = IrtAddress(location); Dereference((cell_t *), irt_entry->u.ptr, cp1); _p_cancel_irt_entry(location, 1); scan_term(term, size - 2); #ifdef DEBUG if (ParDebug(6)) { fprintf(_p_stdout, "(%lu,%lu) process_value(): ", (unsigned long) _p_my_id, (unsigned long) _p_reduction); _p_print_term(_p_stdout, term); fprintf(_p_stdout, "\n"); fflush(_p_stdout); } #endif /* DEBUG */ /* * Do not generate a MSG_DEFINE message when overwriting this rref. * That is because the value was supplied to us by the node that * contains the other end of this rref. Since that other node * has given us this value, there is no need for us to bounce * it right back at that other node. */ if (!_p_define(cp1, term, FALSE, FALSE, 0, 0)) { fprintf(_p_stdout, "(%lu,%lu) Warning: Node %lu: Received bad value message -- definition is already defined\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) _p_my_id); } #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) process_value(): Processed\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* process_value() */ /* * _p_send_read() * * Send a MSG_READ message for the given 'rref'. */ void _p_send_read(rref) rref_t *rref; { cell_t *msg_buf; int_t value_return_irt; value_return_irt = _p_alloc_irt_entry(rref, 1); rref->value_return_irt = value_return_irt; msg_buf = _p_alloc_msg_buffer(3); /* msg_buf = {his_irt_entry, my_node_id, my_irt_entry} */ *((u_int_t *)(msg_buf)) = rref->location; *((u_int_t *)(msg_buf + 1)) = _p_my_id; *((u_int_t *)(msg_buf + 2)) = value_return_irt; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_read(): to n %lu, l %lu: return to l %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) rref->node, (unsigned long) rref->location, (unsigned long) *((u_int_t *)(msg_buf + 2))); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef GAUGE _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ _p_msg_send(msg_buf, rref->node, 3, MSG_READ); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_read(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* _p_send_read() */ /* * process_read() * * Process the MSG_READ message that has been received onto the heap. * This entails looking up the irt entry for this data. If the * resulting data is an undef or rref, then queue up a value note. * If it is real data, then generate a MSG_VALUE message. * * GC_ALERT: This procedure may induce garbage collection */ static void process_read(size, node) int size; int node; { cell_t *term; cell_t *msg_buf; value_note_t *value_note; u_int_t my_location, return_node, return_location; irt_t *irt_entry; #ifdef GAUGE _p_gauge_stats.rsmallmsgs++; #endif /* GAUGE */ /* _p_heap_ptr = {my_location, return_node_id, return_location} */ my_location = *((u_int_t *)(_p_heap_ptr)); return_node = *((u_int_t *)(_p_heap_ptr + 1)); return_location = *((u_int_t *)(_p_heap_ptr + 2)); #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) process_read(): for l %lu: return n %lu, l %lu:\t", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) my_location, (unsigned long) return_node, (unsigned long) return_location); fflush(_p_stdout); } #endif /* DEBUG */ irt_entry = IrtAddress(my_location); Dereference((cell_t *), irt_entry->u.ptr, term); switch (DHCellTag(term)) { case UNDEF_TAG: /* Drop a value note onto the suspension queue */ #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "enqueueing value note\n"); fflush(_p_stdout); } #endif /* DEBUG */ /* GC_ALERT */ PushGCReference(&term); value_note = _p_alloc_value_note(return_location, return_node); PopGCReference(1); EnqueueSuspension(((proc_record_t *) value_note), term); break; case RREF_TAG: /* Forward the read message to the appropriate node */ msg_buf = _p_alloc_msg_buffer(3); *((u_int_t *)(msg_buf)) = ((rref_t *) term)->location; *((u_int_t *)(msg_buf + 1)) = return_node; *((u_int_t *)(msg_buf + 2)) = return_location; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "forwarding to node %lu\n", (unsigned long) ((rref_t *) term)->node); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef GAUGE _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ _p_msg_send(msg_buf, ((rref_t *) term)->node, 3, MSG_READ); break; default: /* Answer the read message with a value message */ #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "returning with value message\n"); fflush(_p_stdout); } #endif /* DEBUG */ _p_send_value(return_node, return_location, term); break; } #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) process_read(): Processed\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* process_read() */ /* * _p_send_cancels() * * Send a MSG_CANCEL message for all remote references that need * to be canceled. * * _p_cancel_lists is an array with _p_nodes elements, where each * element is a pointer to a list of data structure, where each * data structure is either a rref or a value_note. * For each node (i.e., each list starting at a particular array element), * send cancel messages for the items on that list. * * In rrefs, the 'node' field is used to hold the linked list pointer. * In value_notes, the 'next' field is used to hold the linked list pointer. */ void _p_send_cancels() { cell_t **cancel_list_entry; int_t node, i, j; int_t msg_length; cell_t *this_ds, *next_ds; cell_t *msg_buf, *mb, *end; for (node = 0, cancel_list_entry = _p_cancel_lists; node < _p_nodes && _p_cancels > 0; node++, cancel_list_entry++) { if (*cancel_list_entry != (cell_t *) NULL) { this_ds = *cancel_list_entry; while (this_ds != (cell_t *) NULL) { mb = msg_buf = _p_alloc_msg_buffer(CANCEL_SIZE); for (i = 0, mb++, end = msg_buf + CANCEL_SIZE - 1; (mb < end) && (this_ds != (cell_t *) NULL); i++) { if (*this_ds == 0) { /* * This is a rref that was defined a value * during emulation. * * Note: This relies on the specific layout of rref_t. */ *((int_t *) mb++) = (int_t) *((u_int_t *)(this_ds+1)); *((int_t *) mb++) = (int_t) *((u_int_t *)(this_ds+2)); next_ds = *((cell_t **)(this_ds+3)); for (j = RrefSizeWithTrailer() - 2; j > 0; j--) *(++this_ds) = (cell_t) 0; } else if (IsRref(this_ds)) { /* * This is an orphaned rref that was found * during garbage collection. */ *((int_t *) mb++) = ((rref_t *)this_ds)->location; *((int_t *) mb++) = ((rref_t *)this_ds)->weight; next_ds = (cell_t *) ((rref_t *)this_ds)->node; for (j = RrefSizeWithTrailer(); j > 0; j--) *this_ds++ = (cell_t) 0; } else /* IsValueNote(this_ds) */ { /* * This is either: * - an orphaned value_note that was found * during garbage collection * - a value_note that was discarded by * utils.c:process_susp_queue() */ *((int_t *) mb++) =((value_note_t *)this_ds)->location; *((int_t *) mb++) = 1; next_ds = (cell_t *) ((value_note_t *)this_ds)->next; _p_free_value_note((value_note_t *) this_ds); } this_ds = next_ds; _p_cancels--; } *((int_t *) msg_buf) = i; msg_length = mb - msg_buf; #ifdef GAUGE if (msg_length > SMALL_MSG_THRESHOLD) { _p_gauge_stats.sbigmsgs++; _p_gauge_stats.sbigmsgslen += msg_length; } else _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_cancels(): Sending to node %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node); } #endif /* DEBUG */ _p_msg_send(msg_buf, node, msg_length, MSG_CANCEL); } *cancel_list_entry = (cell_t *) NULL; } } #ifdef DEBUG /* Consistency check */ if (_p_cancels != 0) { _p_fatal_error("_p_send_cancels(): Failed consistency check. Lost some cancels somewhere..."); } #endif /* DEBUG */ } /* _p_send_cancels() */ /* * process_cancel() * * Process the MSG_CANCEL message that has been received onto the heap. * This entails looking up the irt entry for this data, and cancelling * the weight from that entry. */ static void process_cancel(size, node) int size; int node; { cell_t *cancels = _p_heap_ptr; int_t n_cancels; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) process_cancel(): From node %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node); } #endif /* DEBUG */ #ifdef GAUGE if (size > SMALL_MSG_THRESHOLD) { _p_gauge_stats.rbigmsgs++; _p_gauge_stats.rbigmsgslen += size; } else _p_gauge_stats.rsmallmsgs++; #endif /* GAUGE */ for (n_cancels = *((u_int_t *) cancels++); n_cancels > 0; n_cancels--, cancels += 2) { _p_cancel_irt_entry(*((u_int_t *) cancels), *((u_int_t *) (cancels + 1))); } } /* process_cancel() */ /* * _p_send_collect() * * Send a MSG_COLLECT message to all "appropriate" nodes. This is * called when a garbage collection on a node of a multiprocessor * run has failed to collect enough space. */ void _p_send_collect() { int i; for (i = _p_nodes - 1; i >= 0; i--) { if (i != _p_my_id) { #ifdef GAUGE _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ _p_msg_send((cell_t *) NULL, i, 0, MSG_COLLECT); } } } /* _p_send_collect() */ /* * process_collect() * * Process the MSG_COLLECT message that has been received onto the heap. * This entails calling the garbage collector, with the hope of generating * some MSG_CANCEL messages to free up space on other nodes. */ static void process_collect(size, node) int size; int node; { #ifdef GAUGE _p_gauge_stats.rsmallmsgs++; #endif /* GAUGE */ DoGC(); } /* process_collect() */ /* * _p_send_exit() * * Send a MSG_EXIT message to the given 'node' with the given 'exit_code' */ void _p_send_exit(node, msg_type, exit_code) int_t node; int_t msg_type; int_t exit_code; { cell_t *msg_buf; msg_buf = _p_alloc_msg_buffer(1); *((int_t *)(msg_buf)) = exit_code; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_exit(): to node %lu, exit_code %lu, msg_type %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node, (unsigned long) exit_code, (unsigned long) msg_type); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef GAUGE _p_gauge_stats.ssmallmsgs++; #endif /* GAUGE */ _p_msg_send(msg_buf, node, 1, msg_type); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) _p_send_exit(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* _p_send_exit() */ static void process_exit(size, node) int size; int node; { int_t exit_code; #ifdef GAUGE _p_gauge_stats.rsmallmsgs++; #endif /* GAUGE */ exit_code = *((int_t *)(_p_heap_ptr)); if (_p_host) _p_host_handle_exit(exit_code); else _p_node_handle_exit(exit_code, FALSE); } /* process_exit() */ /* * _p_process_messages() * * Process any waiting messages from other nodes. * * Valid 'rcv_type' arguments are: * RCV_NOBLOCK Do not block if no messges are waiting * RCV_BLOCK Block until at least one MSG_DEFINE or MSG_VALUE * message has been processed. * RCV_COLLECT When called from _p_garbage_collect(). Ignore * MSG_COLLECT messages, and queue up * MSG_DEFINE and MSG_VALUE. Block until * we get a MSG_CANCEL. */ void _p_process_messages(rcv_type) int rcv_type; { char error_msg[1024]; int_t type, size, node; bool_t done = FALSE; while (!done) { if (!_p_msg_receive(&node, &size, &type, rcv_type)) done = TRUE; else { switch(type) { case MSG_DEFINE: process_define(size, node); rcv_type = RCV_NOBLOCK; break; case MSG_VALUE: process_value(size, node); rcv_type = RCV_NOBLOCK; break; case MSG_READ: process_read(size, node); break; case MSG_CANCEL: process_cancel(size, node); done = TRUE; break; case MSG_COLLECT: process_collect(size, node); break; #ifdef GAUGE case MSG_GAUGE: _p_process_gauge_msg(size, node); break; #endif /* GAUGE */ case MSG_INITIATE_EXIT: if (_p_host) process_exit(size, node); else _p_fatal_error("Node received unexpected MSG_INITIATE_EXIT message"); done = TRUE; break; case MSG_EXIT: if (!_p_host) process_exit(size, node); else _p_fatal_error("Host received unexpected MSG_EXIT message"); done = TRUE; break; default: sprintf(error_msg, "Invalid message received on node %lu: type=%lu, size=%lu, from node=%lu", (unsigned long) _p_my_id, (unsigned long) type, (unsigned long) size, (unsigned long) node); _p_fatal_error(error_msg); break; } } } } /* _p_process_messages() */ /* * _p_host_handle_exit() */ void _p_host_handle_exit(exit_code) int_t exit_code; { int type, size, node; int_t i; int_t exit_count; /* * (Reset _p_heap_ptr so that we can't possibly receive a * message that goes off the top of the heap.) */ _p_heap_ptr = _p_heap_bottom; /* Send a MSG_EXIT to each node */ for (i = 1; i < _p_nodes; i++) _p_send_exit(i, MSG_EXIT, exit_code); /* * Now wait for each node to return a MSG_EXIT message, and discard * any other messages that come in in the mean time. * (Reset _p_heap_ptr so that we can't possibly receive a * message that goes off the top of the heap.) */ for (exit_count = 1; exit_count < _p_nodes; ) { _p_msg_receive(&node, &size, &type, RCV_BLOCK); if (type == MSG_EXIT) exit_count++; } #ifdef GAUGE /* Take a final profile snapshot, and coalesce the node profiles */ _p_host_final_profile(); #endif /* GAUGE */ #ifdef UPSHOT /* Dump the upshot log */ _p_write_upshot_log(); #endif #if defined(GAUGE) || defined(UPSHOT) /* Do another sync up with the nodes */ for (i = 1; i < _p_nodes; i++) _p_send_exit(i, MSG_EXIT, exit_code); for (exit_count = 1; exit_count < _p_nodes; ) { _p_msg_receive(&node, &size, &type, RCV_BLOCK); if (type == MSG_EXIT) exit_count++; } #endif /* GAUGE || UPSHOT */ /* * If the send/receive module requires some special method of * shutting down all of the nodes, then do it here. * * At this point we know that all of the nodes are at least * into the _p_destroy_nodes() procedure, so _p_destroy_nodes() * can forcably kill all the nodes if it wishes. */ _p_destroy_nodes(); /* Send a MSG_EXIT to each node */ for (i = 1; i < _p_nodes; i++) _p_send_exit(i, MSG_EXIT, exit_code); _p_shutdown_pcn(); exit((int) exit_code); } /* _p_host_handle_exit() */ /* * _p_node_handle_exit() */ void _p_node_handle_exit(exit_code, initiate_it) int_t exit_code; bool_t initiate_it; { int type, size, node; bool_t done; /* * (Reset _p_heap_ptr so that we can't possibly receive a * message that goes off the top of the heap.) */ _p_heap_ptr = _p_heap_bottom; if (initiate_it) { /* * This node is initiating the exit. So send a MSG_INITIATE_EXIT * message to the host, and then wait for the normal MSG_EXIT * message that the host will broadcast. */ _p_send_exit(_p_host_id, MSG_INITIATE_EXIT, exit_code); for (done = FALSE; !done; ) { _p_msg_receive(&node, &size, &type, RCV_BLOCK); if (type == MSG_EXIT) { exit_code = *((int_t *)(_p_heap_ptr)); done = TRUE; } } } /* Send a MSG_EXIT message to the host */ _p_send_exit(_p_host_id, MSG_EXIT, exit_code); #ifdef GAUGE /* Take a final profile snapshot, and coalesce the node profiles */ _p_node_final_profile(); #endif /* GAUGE */ #ifdef UPSHOT /* Dump the upshot log */ _p_write_upshot_log(); #endif /* UPSHOT */ #if defined(GAUGE) || defined(UPSHOT) /* Do another sync up with the host */ for (done = FALSE; !done; ) { _p_msg_receive(&node, &size, &type, RCV_BLOCK); if (type == MSG_EXIT) done = TRUE; } _p_send_exit(_p_host_id, MSG_EXIT, exit_code); #endif /* GAUGE || UPSHOT */ /* * If the send/receive module requires some special method of * shutting down all of the nodes, then do it here. * * However, a node should not do something that will cause * the destruction of other nodes or the host, since the * other nodes may not have completed previous operations yet. */ _p_destroy_nodes(); /* * Now wait to either get killed or receive a final MSG_EXIT message. */ for (done = FALSE; !done; ) { _p_msg_receive(&node, &size, &type, RCV_BLOCK); if (type == MSG_EXIT) done = TRUE; } _p_shutdown_pcn(); exit((int) exit_code); } /* _p_node_handle_exit() */ #endif /* PARALLEL */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.