This is read-write.c in view mode; [Download] [Up]
#include "pthread_io_delay.h" #include "pthread_socket.h" #define SLEEPTIME (100000) /* 100 milliseconds */ #define FATAL(n) \ {\ pthread_lock_global_np(); \ errno = n; \ perror("Warning: select() failed in monitor thread" ); \ pthread_unlock_global_np(); \ } typedef struct count_sem { pthread_mutex_t lock; pthread_cond_t cond; int count; } count_sem_t; typedef struct bin_sem { pthread_mutex_t lock; pthread_cond_t cond; int flag; } bin_sem_t; typedef struct fd_queue_elem { int fd; int count; bin_sem_t sem; struct fd_queue_elem *next; } fd_queue_elem_t; typedef struct fd_queue { fd_queue_elem_t * head; fd_queue_elem_t * tail; } *fd_queue_t; static bin_sem_t fd_server_sem; static fd_queue_t fd_wait_read, fd_wait_write; static pthread_t monitor_thread; static struct timespec sleeptime = {0, SLEEPTIME }; static void bin_sem_init( bin_sem_t * s ) { pthread_mutex_init( &s->lock, NULL ); pthread_cond_init( &s->cond, NULL ); s->flag = FALSE; } static fd_queue_elem_t * fd_enq( fd_queue_t q, int fd ) { fd_queue_elem_t *new_elem; for( new_elem = q->head; new_elem; new_elem = new_elem->next ) { if( new_elem->fd == fd ) { new_elem->count++; return( new_elem ); } } new_elem = (fd_queue_elem_t *) malloc( sizeof(fd_queue_elem_t) ); bin_sem_init( &new_elem->sem ); new_elem->fd = fd; new_elem->next = q->head; if( q->head == NULL ) { q->tail = new_elem; } q->head = new_elem; new_elem->count = 1; return( new_elem ); } static void fd_deq_elem( fd_queue_t q, fd_queue_elem_t * q_elem ) { fd_queue_elem_t *prev = NULL; fd_queue_elem_t *next = q->head; while( next != NULL && next != q_elem ) { prev = next; next = next->next; } if( next == NULL ) { #ifdef DEBUG fprintf( stderr, "fd_deq_elem(elem %x) failed\n", q_elem ); #endif return; } /* * other threads still waiting on this element */ if( --next->count != 0 ) return; if( prev == NULL ) { q->head = next->next; } else { prev->next = next->next; } if( next->next == NULL ) { q->tail = prev; } /* cannot do free(next) since semaphore still used */ } /* * syscall_select() is a wrapper around the select() system call. Note * that this service returns -1 on error with errno set appropriately. * Because errno gets set in this call, we have to surround it with * a lock. */ extern int syscall_select( int nd, fd_set * in, fd_set * out, fd_set * ex, struct timeval * tv); /* * This function runs in a separate thread and monitors file descriptors. */ static void fd_monitor( void ) { int count, ret; struct timeval timeout = {0, 0}; struct timespec state_changed_timeout; fd_queue_elem_t *q_elem, *q_elem_tmp; fd_set fd_set_read, fd_set_write; while( TRUE ) { /* * Check whether a file descriptor has been queued to either the * read or write queues. */ if( fd_wait_read->head || fd_wait_write->head ) { int err = 0; FD_ZERO( &fd_set_read ); FD_ZERO( &fd_set_write ); pthread_mutex_lock( &fd_server_sem.lock ); for( q_elem = fd_wait_read->head; q_elem; q_elem = q_elem->next ) FD_SET( q_elem->fd, &fd_set_read ); for( q_elem = fd_wait_write->head; q_elem; q_elem = q_elem->next ) FD_SET( q_elem->fd, &fd_set_write ); pthread_mutex_unlock( &fd_server_sem.lock ); /* * Check whether any of the filedescriptors in fd_set_read or * fd_set_write are ready, timeout immediately. */ count = syscall_select( FD_SETSIZE, &fd_set_read, &fd_set_write, NULL, &timeout ); /* * If syscall_select() returned an error that is other * than signal related, we're probably in big trouble. Should * we continue or not? During development, we should exit. * * However.... * * With respect to signals, since we never block in select (i.e., * the timeout is {0,0}, we should never see a return via * signal interrupt. */ if( count < 0 ) { (void) pthread_get_errno_np( NULL, &err ); FATAL(err); } /* * If one or more read file descriptors are ready, dequeue each * one by signaling the waiting thread(s). */ pthread_mutex_lock( &fd_server_sem.lock ); q_elem = fd_wait_read->head; while( q_elem && count ) { if( FD_ISSET( q_elem->fd, &fd_set_read ) ) { FD_CLR( q_elem->fd, &fd_set_read ); q_elem_tmp = q_elem->next; pthread_mutex_lock( &(q_elem->sem.lock) ); q_elem->sem.flag = TRUE; fd_deq_elem( fd_wait_read, q_elem ); pthread_mutex_unlock( &(q_elem->sem.lock ) ); pthread_cond_signal( &(q_elem->sem.cond) ); q_elem = q_elem_tmp; } else { q_elem = q_elem->next; } count--; } /* * If one or more write file descriptors are ready, dequeue each * one, signaling the waiting thread(s). */ q_elem = fd_wait_write->head; while( q_elem && count ) { if( FD_ISSET( q_elem->fd, &fd_set_write ) ) { FD_CLR( q_elem->fd, &fd_set_write ); q_elem_tmp = q_elem->next; pthread_mutex_lock( &(q_elem->sem.lock ) ); q_elem->sem.flag = TRUE; fd_deq_elem( fd_wait_write, q_elem ); pthread_mutex_unlock( &(q_elem->sem.lock) ); pthread_cond_signal( &(q_elem->sem.cond) ); q_elem = q_elem_tmp; } else { q_elem = q_elem->next; } count--; } pthread_mutex_unlock( &fd_server_sem.lock ); } pthread_mutex_lock( &fd_server_sem.lock ); if( fd_wait_read->head || fd_wait_write->head ) { /* * Calculate the expiration time for the up-and-coming call to * pthread_cond_timedwait(). */ pthread_get_expiration_np( &sleeptime, &state_changed_timeout ); ret = 0; while( !fd_server_sem.flag && ret != NOTOK ) { ret = pthread_cond_timedwait( &fd_server_sem.cond, &fd_server_sem.lock, &state_changed_timeout ); } } else { /* * Since fd_read and fd_write are empty, wait, until * one of them has changed */ while( !fd_server_sem.flag ) { pthread_cond_wait( &fd_server_sem.cond, &fd_server_sem.lock ); } } fd_server_sem.flag = FALSE; pthread_mutex_unlock( &fd_server_sem.lock ); } } void fd_init( void ) { bin_sem_init( &fd_server_sem ); fd_wait_read = (fd_queue_t) malloc( sizeof(struct fd_queue) ); fd_wait_read->head = NULL; fd_wait_read->tail = NULL; fd_wait_write = (fd_queue_t) malloc( sizeof(struct fd_queue) ); fd_wait_write->head = NULL; fd_wait_write->tail = NULL; pthread_create( &monitor_thread, NULL, (thread_proc_t) fd_monitor, NULL ); pthread_yield( NULL ); /* ??? use high prio for server thread */ } pthread_once_t io_initialized = PTHREAD_ONCE_INIT; /* * Defined in sys_io.S */ extern ssize_t sys_io_read( int fd, void *buf, size_t nbytes ); static int syscall_read( int fd, void *buf, size_t nbytes ) { int ret; LOCK_ON ret = sys_io_read( fd, buf, nbytes ); LOCK_OFF return( ret ); } /*-------------------------------------------------------------------------- * @@ R E A D *-------------------------------------------------------------------------- * The read () function behaves in all respects like the * read(2) POSIX.1 function except that only the calling thread is blocked. */ ssize_t read( int fd, void *buf, size_t nbytes ) { int err, st, ret, last; fd_queue_elem_t *q_elem; st = pthread_once( &io_initialized, fd_init ); if( st != SUCCESS ) return( FAILURE ); if( nonblock_is_set( fd )) { ret = syscall_read( fd, buf, nbytes ); RETURN( ret ); } set_fd_nonblock( fd ); while( (ret = syscall_read( fd, buf, nbytes )) < OK ) { (void) pthread_get_errno_np( NULL, &err ); if( err != EAGAIN ) break; pthread_mutex_lock( &fd_server_sem.lock ); q_elem = fd_enq( fd_wait_read, fd ); fd_server_sem.flag = TRUE; pthread_mutex_unlock( &fd_server_sem.lock ); pthread_cond_signal( &fd_server_sem.cond ); pthread_mutex_lock( &(q_elem->sem.lock) ); while( !(q_elem->sem.flag) ) pthread_cond_wait( &(q_elem->sem.cond), &(q_elem->sem.lock) ); q_elem->sem.flag = FALSE; last = (q_elem->count == 0); pthread_mutex_unlock( &(q_elem->sem.lock) ); if( last ) { free( q_elem ); } } clear_fd_nonblock( fd ); return( ret ); } /* * Defined in sys_io.S */ ssize_t sys_io_write(int fd, const void *buf, size_t count); int syscall_write( int fd, const void *buf, size_t nbytes ) { int ret; LOCK_ON ret = sys_io_write( fd, buf, nbytes ); LOCK_OFF return( ret ); } /*-------------------------------------------------------------------------- * @@ W R I T E *-------------------------------------------------------------------------- * The write() function behaves in all respects like the * write(2) POSIX.1 function except that only the calling thread is blocked. */ ssize_t write( int fd, const void *buf, size_t nbytes ) { int err, st, ret, last; fd_queue_elem_t *q_elem; st = pthread_once( &io_initialized, fd_init ); if( st != SUCCESS ) return( FAILURE ); if( nonblock_is_set( fd )) { ret = syscall_write( fd, buf, nbytes ); RETURN( ret ); } while( (ret = syscall_write( fd, buf, nbytes )) < OK ) { (void) pthread_get_errno_np( NULL, &err ); if( err != EAGAIN ) break; pthread_mutex_lock( &fd_server_sem.lock ); q_elem = fd_enq( fd_wait_write, fd ); fd_server_sem.flag = TRUE; pthread_mutex_unlock( &fd_server_sem.lock ); pthread_cond_signal( &fd_server_sem.cond ); pthread_mutex_lock( &(q_elem->sem.lock) ); while( !(q_elem->sem.flag) ) pthread_cond_wait(&(q_elem->sem.cond), &(q_elem->sem.lock) ); q_elem->sem.flag = FALSE; last = (q_elem->count == 0); pthread_mutex_unlock( &(q_elem->sem.lock) ); if( last ) free( q_elem ); /* q_elem semaphore used up */ } clear_fd_nonblock( fd ); return( ret ); }
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.