This is streams.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * streams.c - Procedures for handling various stream related tasks */ #include "pcn.h" #ifdef STREAMS static void get_num_bytes_and_location(); /* * stream_t *_p_get_stream_record(cell_t *stream_array, cell_t *indexp) * * Use the passed pointer to stream array (not dereferenced) and * the passed pointer to an integer index into the array (derferenced) * to find a particular stream_t record. * * Return: The stream_t record. */ stream_t *_p_get_stream_record(stream_array, indexp) cell_t *stream_array, *indexp; { cell_t *cp; stream_t *streamp; int i; /* * cp = stream array * streamp = first element of stream array */ Dereference((cell_t *), stream_array, cp); #ifdef DEBUG if (((data_header_t *) cp)->tag != STREAM_TAG) _p_fatal_error("_p_get_stream_pointer: Expected a STREAM_TAG for stream"); #endif /* DEBUG */ streamp = (stream_t *) (cp + 1); /* * i = index into stream array */ #ifdef DEBUG if (((data_header_t *) indexp)->tag != INT_TAG) _p_fatal_error("_p_get_stream_pointer: Expected an INT_TAG for index"); #endif /* DEBUG */ i = *((int_t *) (indexp + 1)); streamp += i; /* streamp = stream_array[index] */ return (streamp); } /* _p_get_stream_record() */ /* * void _p_close_stream(int_t id) * * Mark the stream with the given stream id as closed. This should only * be called on the receiving side of the stream. * * We are also guaranteed that this procedure is not called until all * messages on this stream have been delivered. */ void _p_close_stream(id) int_t id; { stream_t *streamp1; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) _p_close_stream(): id %ld\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (long) id); } #endif /* DEBUG */ streamp1 = _p_lookup_stream(id); _p_free_stream(id); streamp1->open = 0; if (streamp1->state == 2) /* There is a pending receive */ { /* Set up return status = 1 to indicate a closed stream */ cell_t *cp; BuildInt((cell_t *), cp, 1); if (!_p_define(streamp1->u.r.status, cp, TRUE, FALSE, 0, 0)) { fprintf(_p_stdout, "(%lu,%lu) Warning: Node %lu: Bad define while closing stream -- status definition variable is already defined\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) _p_my_id); } } } /* _p_close_stream() */ /* * void _p_enqueue_local_stream_msg(stream_t *streamp, cell_t *array, * u_int_t offset, u_int_t size) * * Enqueue a local stream message onto the receive queue for * 'streamp'. */ void _p_enqueue_local_stream_msg(streamp, array, offset, size) stream_t *streamp; cell_t *array; u_int_t offset; u_int_t size; { u_int_t tag = ((data_header_t *) array)->tag; u_int_t buf_size = (_p_size_with_trailer(tag, size) + LocalStreamMsgBaseSize); u_int_t num_bytes; char *d; local_stream_msg_t *msg_buf; if ((msg_buf = (local_stream_msg_t *) malloc (buf_size * CELL_SIZE)) == (local_stream_msg_t *) NULL) _p_malloc_error(); msg_buf->next = (local_stream_msg_t *) NULL; if (streamp->u.lq.queue_head == (local_stream_msg_t *) NULL) { streamp->u.lq.queue_head = streamp->u.lq.queue_tail = msg_buf; } else { local_stream_msg_t *m = streamp->u.lq.queue_tail; m->next = streamp->u.lq.queue_tail = msg_buf; } msg_buf->tag = tag; msg_buf->offset = offset; msg_buf->size = size; get_num_bytes_and_location((char *) (array + 1), tag, offset, size, &num_bytes, &d); memcpy(msg_buf->data, d, num_bytes); } /* _p_enqueue_local_stream_msg() */ /* * local_stream_msg_t * _p_dequeue_local_stream_msg(stream_t *streamp) * * Dequeue a local stream message from the receive queue for * 'streamp'. */ local_stream_msg_t *_p_dequeue_local_stream_msg(streamp) stream_t *streamp; { local_stream_msg_t *msg_buf; if (streamp->u.lq.queue_head != (local_stream_msg_t *) NULL) { msg_buf = streamp->u.lq.queue_head; streamp->u.lq.queue_head = msg_buf->next; } else { _p_fatal_error("_p_dequeue_local_stream_msg(): NULL head of local stream queue"); } return (msg_buf); } /* _p_dequeue_local_stream_msg() */ /* * void _p_free_local_stream_msg(local_stream_msg_t *msg) * * Free the memory for the local stream message, 'msg', that was created * with _p_enqueue_local_stream_msg(), and retrieved with * _p_dequeue_local_stream_msg(). */ void _p_free_local_stream_msg(msg) local_stream_msg_t *msg; { free(msg); } /* _p_dequeue_local_stream_msg() */ /* * void _p_set_stream_recv_status(cell_t *status_undef, int_t status_code, * char *where) */ void _p_set_stream_recv_status(status_undef, status_code, where) cell_t *status_undef; int_t status_code; char *where; { cell_t *cp1, *cp2; Dereference((cell_t *), status_undef, cp1); BuildInt((cell_t *), cp2, status_code); if (!_p_define(cp1, cp2, TRUE, FALSE, 0, 0)) fprintf(_p_stdout, "(%lu,%lu) Warning: Node %lu: %s: Return status variable for stream receive already defined in %s:%s\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) _p_my_id, where, #ifdef PDB _pdb_get_module_name(_p_current_proc->program), #else /* PDB */ "", #endif /* PDB */ ProgramName(_p_current_proc->program)); } /* _p_set_stream_recv_status() */ void _p_copy_stream_data(to_array, to_offset, from_array, from_offset, tag, size, copy_type) cell_t *to_array; u_int_t to_offset; cell_t *from_array; u_int_t from_offset; u_int_t tag; u_int_t size; int copy_type; { u_int_t n_bytes; char *to; char *from; n_bytes = _p_size_with_trailer(tag,size); if (tag == STRING_TAG) { } else if (tag == DOUBLE_TAG) { } else if (tag == INT_TAG) { } else { _p_fatal_error("_p_copy_stream_data(): Illegal tag"); } } /* _p_copy_stream_data() */ /* * void get_num_bytes_and_location(data, tag, offset, size, num_bytes, loc) * * Given a pointer to the 'data', its 'tag', an 'offset', and a 'size', * compute the number of bytes that need to be copied (set 'num_bytes' to * this value) and the location of the copy (set 'loc' to this value). */ static void get_num_bytes_and_location(data, tag, offset, size, num_bytes, loc) char *data; u_int_t tag; u_int_t offset; u_int_t size; u_int_t *num_bytes; char **loc; { if (tag == STRING_TAG) { *loc = data + offset; *num_bytes = size; } else if (tag == DOUBLE_TAG) { *loc = data + (offset * CELLS_PER_DOUBLE * CELL_SIZE); *num_bytes = (size * CELLS_PER_DOUBLE * CELL_SIZE); } else if (tag == INT_TAG) { *loc = data + (offset * CELL_SIZE); *num_bytes = (size * CELL_SIZE); } else { _p_fatal_error("get_num_bytes_and_location(): Illegal tag"); } } /* get_num_bytes_and_location() */ #endif /* STREAMS */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.