This is streams.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* streams.c - Procedures for handling various stream related tasks
*/
#include "pcn.h"
#ifdef STREAMS
static void get_num_bytes_and_location();
/*
* stream_t *_p_get_stream_record(cell_t *stream_array, cell_t *indexp)
*
* Use the passed pointer to stream array (not dereferenced) and
* the passed pointer to an integer index into the array (derferenced)
* to find a particular stream_t record.
*
* Return: The stream_t record.
*/
stream_t *_p_get_stream_record(stream_array, indexp)
cell_t *stream_array, *indexp;
{
cell_t *cp;
stream_t *streamp;
int i;
/*
* cp = stream array
* streamp = first element of stream array
*/
Dereference((cell_t *), stream_array, cp);
#ifdef DEBUG
if (((data_header_t *) cp)->tag != STREAM_TAG)
_p_fatal_error("_p_get_stream_pointer: Expected a STREAM_TAG for stream");
#endif /* DEBUG */
streamp = (stream_t *) (cp + 1);
/*
* i = index into stream array
*/
#ifdef DEBUG
if (((data_header_t *) indexp)->tag != INT_TAG)
_p_fatal_error("_p_get_stream_pointer: Expected an INT_TAG for index");
#endif /* DEBUG */
i = *((int_t *) (indexp + 1));
streamp += i; /* streamp = stream_array[index] */
return (streamp);
} /* _p_get_stream_record() */
/*
* void _p_close_stream(int_t id)
*
* Mark the stream with the given stream id as closed. This should only
* be called on the receiving side of the stream.
*
* We are also guaranteed that this procedure is not called until all
* messages on this stream have been delivered.
*/
void _p_close_stream(id)
int_t id;
{
stream_t *streamp1;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout, "(%lu,%lu) _p_close_stream(): id %ld\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(long) id);
}
#endif /* DEBUG */
streamp1 = _p_lookup_stream(id);
_p_free_stream(id);
streamp1->open = 0;
if (streamp1->state == 2) /* There is a pending receive */
{
/* Set up return status = 1 to indicate a closed stream */
cell_t *cp;
BuildInt((cell_t *), cp, 1);
if (!_p_define(streamp1->u.r.status, cp, TRUE, FALSE, 0, 0))
{
fprintf(_p_stdout,
"(%lu,%lu) Warning: Node %lu: Bad define while closing stream -- status definition variable is already defined\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) _p_my_id);
}
}
} /* _p_close_stream() */
/*
* void _p_enqueue_local_stream_msg(stream_t *streamp, cell_t *array,
* u_int_t offset, u_int_t size)
*
* Enqueue a local stream message onto the receive queue for
* 'streamp'.
*/
void _p_enqueue_local_stream_msg(streamp, array, offset, size)
stream_t *streamp;
cell_t *array;
u_int_t offset;
u_int_t size;
{
u_int_t tag = ((data_header_t *) array)->tag;
u_int_t buf_size = (_p_size_with_trailer(tag, size)
+ LocalStreamMsgBaseSize);
u_int_t num_bytes;
char *d;
local_stream_msg_t *msg_buf;
if ((msg_buf = (local_stream_msg_t *) malloc (buf_size * CELL_SIZE))
== (local_stream_msg_t *) NULL)
_p_malloc_error();
msg_buf->next = (local_stream_msg_t *) NULL;
if (streamp->u.lq.queue_head == (local_stream_msg_t *) NULL)
{
streamp->u.lq.queue_head = streamp->u.lq.queue_tail = msg_buf;
}
else
{
local_stream_msg_t *m = streamp->u.lq.queue_tail;
m->next = streamp->u.lq.queue_tail = msg_buf;
}
msg_buf->tag = tag;
msg_buf->offset = offset;
msg_buf->size = size;
get_num_bytes_and_location((char *) (array + 1), tag, offset, size,
&num_bytes, &d);
memcpy(msg_buf->data, d, num_bytes);
} /* _p_enqueue_local_stream_msg() */
/*
* local_stream_msg_t * _p_dequeue_local_stream_msg(stream_t *streamp)
*
* Dequeue a local stream message from the receive queue for
* 'streamp'.
*/
local_stream_msg_t *_p_dequeue_local_stream_msg(streamp)
stream_t *streamp;
{
local_stream_msg_t *msg_buf;
if (streamp->u.lq.queue_head != (local_stream_msg_t *) NULL)
{
msg_buf = streamp->u.lq.queue_head;
streamp->u.lq.queue_head = msg_buf->next;
}
else
{
_p_fatal_error("_p_dequeue_local_stream_msg(): NULL head of local stream queue");
}
return (msg_buf);
} /* _p_dequeue_local_stream_msg() */
/*
* void _p_free_local_stream_msg(local_stream_msg_t *msg)
*
* Free the memory for the local stream message, 'msg', that was created
* with _p_enqueue_local_stream_msg(), and retrieved with
* _p_dequeue_local_stream_msg().
*/
void _p_free_local_stream_msg(msg)
local_stream_msg_t *msg;
{
free(msg);
} /* _p_dequeue_local_stream_msg() */
/*
* void _p_set_stream_recv_status(cell_t *status_undef, int_t status_code,
* char *where)
*/
void _p_set_stream_recv_status(status_undef, status_code, where)
cell_t *status_undef;
int_t status_code;
char *where;
{
cell_t *cp1, *cp2;
Dereference((cell_t *), status_undef, cp1);
BuildInt((cell_t *), cp2, status_code);
if (!_p_define(cp1, cp2, TRUE, FALSE, 0, 0))
fprintf(_p_stdout,
"(%lu,%lu) Warning: Node %lu: %s: Return status variable for stream receive already defined in %s:%s\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) _p_my_id, where,
#ifdef PDB
_pdb_get_module_name(_p_current_proc->program),
#else /* PDB */
"",
#endif /* PDB */
ProgramName(_p_current_proc->program));
} /* _p_set_stream_recv_status() */
void _p_copy_stream_data(to_array, to_offset, from_array, from_offset,
tag, size, copy_type)
cell_t *to_array;
u_int_t to_offset;
cell_t *from_array;
u_int_t from_offset;
u_int_t tag;
u_int_t size;
int copy_type;
{
u_int_t n_bytes;
char *to;
char *from;
n_bytes = _p_size_with_trailer(tag,size);
if (tag == STRING_TAG)
{
}
else if (tag == DOUBLE_TAG)
{
}
else if (tag == INT_TAG)
{
}
else
{
_p_fatal_error("_p_copy_stream_data(): Illegal tag");
}
} /* _p_copy_stream_data() */
/*
* void get_num_bytes_and_location(data, tag, offset, size, num_bytes, loc)
*
* Given a pointer to the 'data', its 'tag', an 'offset', and a 'size',
* compute the number of bytes that need to be copied (set 'num_bytes' to
* this value) and the location of the copy (set 'loc' to this value).
*/
static void get_num_bytes_and_location(data, tag, offset, size, num_bytes, loc)
char *data;
u_int_t tag;
u_int_t offset;
u_int_t size;
u_int_t *num_bytes;
char **loc;
{
if (tag == STRING_TAG)
{
*loc = data + offset;
*num_bytes = size;
}
else if (tag == DOUBLE_TAG)
{
*loc = data + (offset * CELLS_PER_DOUBLE * CELL_SIZE);
*num_bytes = (size * CELLS_PER_DOUBLE * CELL_SIZE);
}
else if (tag == INT_TAG)
{
*loc = data + (offset * CELL_SIZE);
*num_bytes = (size * CELL_SIZE);
}
else
{
_p_fatal_error("get_num_bytes_and_location(): Illegal tag");
}
} /* get_num_bytes_and_location() */
#endif /* STREAMS */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.