This is read-write.c in view mode; [Download] [Up]
#include "pthread_io_delay.h"
#include "pthread_socket.h"
#define SLEEPTIME (100000) /* 100 milliseconds */
#define FATAL(n) \
{\
pthread_lock_global_np(); \
errno = n; \
perror("Warning: select() failed in monitor thread" ); \
pthread_unlock_global_np(); \
}
typedef struct count_sem
{
pthread_mutex_t lock;
pthread_cond_t cond;
int count;
} count_sem_t;
typedef struct bin_sem
{
pthread_mutex_t lock;
pthread_cond_t cond;
int flag;
} bin_sem_t;
typedef struct fd_queue_elem
{
int fd;
int count;
bin_sem_t sem;
struct fd_queue_elem *next;
} fd_queue_elem_t;
typedef struct fd_queue
{
fd_queue_elem_t * head;
fd_queue_elem_t * tail;
} *fd_queue_t;
static bin_sem_t fd_server_sem;
static fd_queue_t fd_wait_read, fd_wait_write;
static pthread_t monitor_thread;
static struct timespec sleeptime = {0, SLEEPTIME };
static void
bin_sem_init( bin_sem_t * s )
{
pthread_mutex_init( &s->lock, NULL );
pthread_cond_init( &s->cond, NULL );
s->flag = FALSE;
}
static fd_queue_elem_t *
fd_enq( fd_queue_t q, int fd )
{
fd_queue_elem_t *new_elem;
for( new_elem = q->head; new_elem; new_elem = new_elem->next )
{
if( new_elem->fd == fd )
{
new_elem->count++;
return( new_elem );
}
}
new_elem = (fd_queue_elem_t *) malloc( sizeof(fd_queue_elem_t) );
bin_sem_init( &new_elem->sem );
new_elem->fd = fd;
new_elem->next = q->head;
if( q->head == NULL )
{
q->tail = new_elem;
}
q->head = new_elem;
new_elem->count = 1;
return( new_elem );
}
static void
fd_deq_elem( fd_queue_t q, fd_queue_elem_t * q_elem )
{
fd_queue_elem_t *prev = NULL;
fd_queue_elem_t *next = q->head;
while( next != NULL && next != q_elem )
{
prev = next;
next = next->next;
}
if( next == NULL )
{
#ifdef DEBUG
fprintf( stderr, "fd_deq_elem(elem %x) failed\n", q_elem );
#endif
return;
}
/*
* other threads still waiting on this element
*/
if( --next->count != 0 )
return;
if( prev == NULL )
{
q->head = next->next;
}
else
{
prev->next = next->next;
}
if( next->next == NULL )
{
q->tail = prev;
}
/* cannot do free(next) since semaphore still used */
}
/*
* syscall_select() is a wrapper around the select() system call. Note
* that this service returns -1 on error with errno set appropriately.
* Because errno gets set in this call, we have to surround it with
* a lock.
*/
extern int
syscall_select( int nd,
fd_set * in,
fd_set * out,
fd_set * ex,
struct timeval * tv);
/*
* This function runs in a separate thread and monitors file descriptors.
*/
static void
fd_monitor( void )
{
int count, ret;
struct timeval timeout = {0, 0};
struct timespec state_changed_timeout;
fd_queue_elem_t *q_elem, *q_elem_tmp;
fd_set fd_set_read, fd_set_write;
while( TRUE )
{
/*
* Check whether a file descriptor has been queued to either the
* read or write queues.
*/
if( fd_wait_read->head || fd_wait_write->head )
{
int err = 0;
FD_ZERO( &fd_set_read );
FD_ZERO( &fd_set_write );
pthread_mutex_lock( &fd_server_sem.lock );
for( q_elem = fd_wait_read->head; q_elem; q_elem = q_elem->next )
FD_SET( q_elem->fd, &fd_set_read );
for( q_elem = fd_wait_write->head; q_elem; q_elem = q_elem->next )
FD_SET( q_elem->fd, &fd_set_write );
pthread_mutex_unlock( &fd_server_sem.lock );
/*
* Check whether any of the filedescriptors in fd_set_read or
* fd_set_write are ready, timeout immediately.
*/
count = syscall_select( FD_SETSIZE,
&fd_set_read,
&fd_set_write,
NULL,
&timeout );
/*
* If syscall_select() returned an error that is other
* than signal related, we're probably in big trouble. Should
* we continue or not? During development, we should exit.
*
* However....
*
* With respect to signals, since we never block in select (i.e.,
* the timeout is {0,0}, we should never see a return via
* signal interrupt.
*/
if( count < 0 )
{
(void) pthread_get_errno_np( NULL, &err );
FATAL(err);
}
/*
* If one or more read file descriptors are ready, dequeue each
* one by signaling the waiting thread(s).
*/
pthread_mutex_lock( &fd_server_sem.lock );
q_elem = fd_wait_read->head;
while( q_elem && count )
{
if( FD_ISSET( q_elem->fd, &fd_set_read ) )
{
FD_CLR( q_elem->fd, &fd_set_read );
q_elem_tmp = q_elem->next;
pthread_mutex_lock( &(q_elem->sem.lock) );
q_elem->sem.flag = TRUE;
fd_deq_elem( fd_wait_read, q_elem );
pthread_mutex_unlock( &(q_elem->sem.lock ) );
pthread_cond_signal( &(q_elem->sem.cond) );
q_elem = q_elem_tmp;
}
else
{
q_elem = q_elem->next;
}
count--;
}
/*
* If one or more write file descriptors are ready, dequeue each
* one, signaling the waiting thread(s).
*/
q_elem = fd_wait_write->head;
while( q_elem && count )
{
if( FD_ISSET( q_elem->fd, &fd_set_write ) )
{
FD_CLR( q_elem->fd, &fd_set_write );
q_elem_tmp = q_elem->next;
pthread_mutex_lock( &(q_elem->sem.lock ) );
q_elem->sem.flag = TRUE;
fd_deq_elem( fd_wait_write, q_elem );
pthread_mutex_unlock( &(q_elem->sem.lock) );
pthread_cond_signal( &(q_elem->sem.cond) );
q_elem = q_elem_tmp;
}
else
{
q_elem = q_elem->next;
}
count--;
}
pthread_mutex_unlock( &fd_server_sem.lock );
}
pthread_mutex_lock( &fd_server_sem.lock );
if( fd_wait_read->head || fd_wait_write->head )
{
/*
* Calculate the expiration time for the up-and-coming call to
* pthread_cond_timedwait().
*/
pthread_get_expiration_np( &sleeptime, &state_changed_timeout );
ret = 0;
while( !fd_server_sem.flag && ret != NOTOK )
{
ret = pthread_cond_timedwait( &fd_server_sem.cond,
&fd_server_sem.lock,
&state_changed_timeout );
}
}
else
{
/*
* Since fd_read and fd_write are empty, wait, until
* one of them has changed
*/
while( !fd_server_sem.flag )
{
pthread_cond_wait( &fd_server_sem.cond, &fd_server_sem.lock );
}
}
fd_server_sem.flag = FALSE;
pthread_mutex_unlock( &fd_server_sem.lock );
}
}
void
fd_init( void )
{
bin_sem_init( &fd_server_sem );
fd_wait_read = (fd_queue_t) malloc( sizeof(struct fd_queue) );
fd_wait_read->head = NULL;
fd_wait_read->tail = NULL;
fd_wait_write = (fd_queue_t) malloc( sizeof(struct fd_queue) );
fd_wait_write->head = NULL;
fd_wait_write->tail = NULL;
pthread_create( &monitor_thread, NULL, (thread_proc_t) fd_monitor, NULL );
pthread_yield( NULL ); /* ??? use high prio for server thread */
}
pthread_once_t io_initialized = PTHREAD_ONCE_INIT;
/*
* Defined in sys_io.S
*/
extern ssize_t
sys_io_read( int fd, void *buf, size_t nbytes );
static int
syscall_read( int fd, void *buf, size_t nbytes )
{
int ret;
LOCK_ON
ret = sys_io_read( fd, buf, nbytes );
LOCK_OFF
return( ret );
}
/*--------------------------------------------------------------------------
* @@ R E A D
*--------------------------------------------------------------------------
* The read () function behaves in all respects like the
* read(2) POSIX.1 function except that only the calling thread is blocked.
*/
ssize_t
read( int fd, void *buf, size_t nbytes )
{
int err, st, ret, last;
fd_queue_elem_t *q_elem;
st = pthread_once( &io_initialized, fd_init );
if( st != SUCCESS )
return( FAILURE );
if( nonblock_is_set( fd ))
{
ret = syscall_read( fd, buf, nbytes );
RETURN( ret );
}
set_fd_nonblock( fd );
while( (ret = syscall_read( fd, buf, nbytes )) < OK )
{
(void) pthread_get_errno_np( NULL, &err );
if( err != EAGAIN )
break;
pthread_mutex_lock( &fd_server_sem.lock );
q_elem = fd_enq( fd_wait_read, fd );
fd_server_sem.flag = TRUE;
pthread_mutex_unlock( &fd_server_sem.lock );
pthread_cond_signal( &fd_server_sem.cond );
pthread_mutex_lock( &(q_elem->sem.lock) );
while( !(q_elem->sem.flag) )
pthread_cond_wait( &(q_elem->sem.cond), &(q_elem->sem.lock) );
q_elem->sem.flag = FALSE;
last = (q_elem->count == 0);
pthread_mutex_unlock( &(q_elem->sem.lock) );
if( last )
{
free( q_elem );
}
}
clear_fd_nonblock( fd );
return( ret );
}
/*
* Defined in sys_io.S
*/
ssize_t
sys_io_write(int fd, const void *buf, size_t count);
int
syscall_write( int fd, const void *buf, size_t nbytes )
{
int ret;
LOCK_ON
ret = sys_io_write( fd, buf, nbytes );
LOCK_OFF
return( ret );
}
/*--------------------------------------------------------------------------
* @@ W R I T E
*--------------------------------------------------------------------------
* The write() function behaves in all respects like the
* write(2) POSIX.1 function except that only the calling thread is blocked.
*/
ssize_t
write( int fd, const void *buf, size_t nbytes )
{
int err, st, ret, last;
fd_queue_elem_t *q_elem;
st = pthread_once( &io_initialized, fd_init );
if( st != SUCCESS )
return( FAILURE );
if( nonblock_is_set( fd ))
{
ret = syscall_write( fd, buf, nbytes );
RETURN( ret );
}
while( (ret = syscall_write( fd, buf, nbytes )) < OK )
{
(void) pthread_get_errno_np( NULL, &err );
if( err != EAGAIN )
break;
pthread_mutex_lock( &fd_server_sem.lock );
q_elem = fd_enq( fd_wait_write, fd );
fd_server_sem.flag = TRUE;
pthread_mutex_unlock( &fd_server_sem.lock );
pthread_cond_signal( &fd_server_sem.cond );
pthread_mutex_lock( &(q_elem->sem.lock) );
while( !(q_elem->sem.flag) )
pthread_cond_wait(&(q_elem->sem.cond), &(q_elem->sem.lock) );
q_elem->sem.flag = FALSE;
last = (q_elem->count == 0);
pthread_mutex_unlock( &(q_elem->sem.lock) );
if( last )
free( q_elem ); /* q_elem semaphore used up */
}
clear_fd_nonblock( fd );
return( ret );
}
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.