ftp.nice.ch/Attic/openStep/implementation/gnustep/sources/alpha-snapshots/pthreads.0.9.2.tgz#/pthreads-0.9.2/socklib/read-write.c

This is read-write.c in view mode; [Download] [Up]

#include "pthread_io_delay.h"
#include "pthread_socket.h"

#define SLEEPTIME  (100000)  /* 100 milliseconds */

#define FATAL(n) \
{\
   pthread_lock_global_np(); \
   errno = n; \
   perror("Warning: select() failed in monitor thread" ); \
   pthread_unlock_global_np(); \
}

typedef struct count_sem
{
   pthread_mutex_t lock;
   pthread_cond_t  cond;
   int             count;

}  count_sem_t;

typedef struct bin_sem
{
   pthread_mutex_t lock;
   pthread_cond_t  cond;
   int             flag;

}  bin_sem_t;

typedef struct fd_queue_elem
{ 
   int                    fd;
   int                    count;
   bin_sem_t              sem;
   struct fd_queue_elem   *next;

}  fd_queue_elem_t;

typedef struct fd_queue 
{
   fd_queue_elem_t * head;
   fd_queue_elem_t * tail;

}  *fd_queue_t;


static bin_sem_t       fd_server_sem;
static fd_queue_t      fd_wait_read, fd_wait_write;
static pthread_t       monitor_thread;

static struct timespec sleeptime = {0, SLEEPTIME };

static void 
bin_sem_init( bin_sem_t * s )
{
    pthread_mutex_init( &s->lock, NULL );
    pthread_cond_init( &s->cond, NULL );
    s->flag = FALSE;
}

static fd_queue_elem_t *
fd_enq( fd_queue_t q, int fd )
{
   fd_queue_elem_t *new_elem;

   for( new_elem = q->head; new_elem; new_elem = new_elem->next ) 
   {
       if( new_elem->fd == fd ) 
       {
           new_elem->count++;
           return( new_elem );
       }
   }

   new_elem = (fd_queue_elem_t *) malloc( sizeof(fd_queue_elem_t) );

   bin_sem_init( &new_elem->sem );
   new_elem->fd = fd;
   new_elem->next = q->head;

   if( q->head == NULL ) 
   {
       q->tail = new_elem;
   }
   q->head = new_elem;
   new_elem->count = 1;
   return( new_elem );
}

static void 
fd_deq_elem( fd_queue_t q, fd_queue_elem_t * q_elem )
{
   fd_queue_elem_t *prev = NULL;
   fd_queue_elem_t *next = q->head;

   while( next != NULL && next != q_elem ) 
   {
       prev = next;
       next = next->next;
   }

   if( next == NULL ) 
   {
#ifdef DEBUG
       fprintf( stderr, "fd_deq_elem(elem %x) failed\n", q_elem );
#endif
       return;
   }
   /*
    * other threads still waiting on this element 
    */
   if( --next->count != 0 )
       return;

   if( prev == NULL ) 
   {
       q->head = next->next;
   } 
   else 
   {
       prev->next = next->next;
   }

   if( next->next == NULL ) 
   {
       q->tail = prev;
   }

   /* cannot do free(next) since semaphore still used */
}

/*
 * syscall_select() is a wrapper around the select() system call.  Note
 * that this service returns -1 on error with errno set appropriately.
 * Because errno gets set in this call, we have to surround it with
 * a lock.
 */
extern int
syscall_select( int nd,
                fd_set * in,
                fd_set * out,
                fd_set * ex,
                struct timeval * tv);

/*
 * This function runs in a separate thread and monitors file descriptors.
 */
static void
fd_monitor( void )
{
   int             count, ret;
   struct timeval  timeout = {0, 0};
   struct timespec state_changed_timeout;
   fd_queue_elem_t *q_elem, *q_elem_tmp;
   fd_set          fd_set_read, fd_set_write;

   while( TRUE ) 
   {
       /*
        * Check whether a file descriptor has been queued to either the
        * read or write queues.
        */
       if( fd_wait_read->head || fd_wait_write->head ) 
       {
           int err = 0;

           FD_ZERO( &fd_set_read );
           FD_ZERO( &fd_set_write );

           pthread_mutex_lock( &fd_server_sem.lock );

           for( q_elem = fd_wait_read->head; q_elem; q_elem = q_elem->next ) 
               FD_SET( q_elem->fd, &fd_set_read );

           for( q_elem = fd_wait_write->head; q_elem; q_elem = q_elem->next ) 
               FD_SET( q_elem->fd, &fd_set_write );

           pthread_mutex_unlock( &fd_server_sem.lock );

           /*
            * Check whether any of the filedescriptors in fd_set_read or
            * fd_set_write are ready, timeout immediately.
            */
           count = syscall_select( FD_SETSIZE, 
                                   &fd_set_read, 
                                   &fd_set_write, 
                                   NULL,
                                   &timeout );

           /*
            * If syscall_select() returned an error that is other
            * than signal related, we're probably in big trouble.  Should
            * we continue or not?  During development, we should exit.
            *
            * However....
            *
            * With respect to signals, since we never block in select (i.e.,
            * the timeout is {0,0}, we should never see a return via
            * signal interrupt.
            */
           if( count < 0 )
           {
               (void) pthread_get_errno_np( NULL, &err );
                FATAL(err);
           }

           /*
            * If one or more read file descriptors are ready, dequeue each
            * one by signaling the waiting thread(s).
            */
           pthread_mutex_lock( &fd_server_sem.lock );
           q_elem = fd_wait_read->head;
           while( q_elem && count ) 
           {
               if( FD_ISSET( q_elem->fd, &fd_set_read ) ) 
               {
                   FD_CLR( q_elem->fd, &fd_set_read );

                   q_elem_tmp = q_elem->next;

                   pthread_mutex_lock( &(q_elem->sem.lock) );
                   q_elem->sem.flag = TRUE;
                   fd_deq_elem( fd_wait_read, q_elem );
                   pthread_mutex_unlock( &(q_elem->sem.lock ) );
                   pthread_cond_signal( &(q_elem->sem.cond) );

                   q_elem = q_elem_tmp;
               } 
               else 
               {
                   q_elem = q_elem->next;
               }

               count--;
           }

           /*
            * If one or more write file descriptors are ready, dequeue each
            * one, signaling the waiting thread(s).
            */
           q_elem = fd_wait_write->head;
           while( q_elem && count )
           {
               if( FD_ISSET( q_elem->fd, &fd_set_write ) ) 
               {
                   FD_CLR( q_elem->fd, &fd_set_write );

                   q_elem_tmp = q_elem->next;

                   pthread_mutex_lock( &(q_elem->sem.lock ) );
                   q_elem->sem.flag = TRUE;
                   fd_deq_elem( fd_wait_write, q_elem );
                   pthread_mutex_unlock( &(q_elem->sem.lock) );
                   pthread_cond_signal( &(q_elem->sem.cond) );

                   q_elem = q_elem_tmp;
               } 
               else 
               {
                   q_elem = q_elem->next;
               }

               count--;
           }

           pthread_mutex_unlock( &fd_server_sem.lock );
       }

       pthread_mutex_lock( &fd_server_sem.lock );
       if( fd_wait_read->head || fd_wait_write->head ) 
       {
           /*
            * Calculate the expiration time for the up-and-coming call to
            * pthread_cond_timedwait().
            */
           pthread_get_expiration_np( &sleeptime, &state_changed_timeout );

           ret = 0;
           while( !fd_server_sem.flag && ret != NOTOK ) 
           {
               ret = pthread_cond_timedwait( &fd_server_sem.cond, 
                                             &fd_server_sem.lock, 
                                             &state_changed_timeout );
           }
       } 
       else 
       {
           /*
            * Since fd_read and fd_write are empty, wait, until
            * one of them has changed 
            */
           while( !fd_server_sem.flag ) 
           {
               pthread_cond_wait( &fd_server_sem.cond, &fd_server_sem.lock );
           }
       }

       fd_server_sem.flag = FALSE;
       pthread_mutex_unlock( &fd_server_sem.lock );
   }
}

void 
fd_init( void )
{
   bin_sem_init( &fd_server_sem );
   fd_wait_read = (fd_queue_t) malloc( sizeof(struct fd_queue) );
   fd_wait_read->head = NULL;
   fd_wait_read->tail = NULL;

   fd_wait_write = (fd_queue_t) malloc( sizeof(struct fd_queue) );
   fd_wait_write->head = NULL;
   fd_wait_write->tail = NULL;

   pthread_create( &monitor_thread, NULL, (thread_proc_t) fd_monitor, NULL );
   pthread_yield( NULL );    /* ??? use high prio for server thread */
}

pthread_once_t io_initialized = PTHREAD_ONCE_INIT;

/*
 * Defined in sys_io.S
 */
extern ssize_t
sys_io_read( int fd, void *buf, size_t nbytes );

static int
syscall_read( int fd, void *buf, size_t nbytes )
{
   int ret;

   LOCK_ON

       ret = sys_io_read( fd, buf, nbytes );

   LOCK_OFF
   return( ret );
}

/*--------------------------------------------------------------------------
 * @@          R E A D
 *--------------------------------------------------------------------------
 * The read () function behaves in all respects like the
 * read(2) POSIX.1 function except that only the calling thread is blocked.
 */
ssize_t 
read( int fd, void *buf, size_t nbytes )
{
   int err, st, ret, last;
   fd_queue_elem_t *q_elem;

   st = pthread_once( &io_initialized, fd_init );
   if( st != SUCCESS )
       return( FAILURE );

   if( nonblock_is_set( fd ))
   {
       ret = syscall_read( fd, buf, nbytes );
       RETURN( ret );
   }

   set_fd_nonblock( fd );
   while( (ret = syscall_read( fd, buf, nbytes )) < OK )
   {
       (void) pthread_get_errno_np( NULL, &err );
       if( err != EAGAIN ) 
           break;

       pthread_mutex_lock( &fd_server_sem.lock );
       q_elem = fd_enq( fd_wait_read, fd );
       fd_server_sem.flag = TRUE;
       pthread_mutex_unlock( &fd_server_sem.lock );
       pthread_cond_signal( &fd_server_sem.cond );

       pthread_mutex_lock( &(q_elem->sem.lock) );
       while( !(q_elem->sem.flag) ) 
           pthread_cond_wait( &(q_elem->sem.cond), &(q_elem->sem.lock) );

       q_elem->sem.flag = FALSE;
       last = (q_elem->count == 0);

       pthread_mutex_unlock( &(q_elem->sem.lock) );
       if( last )
       {
           free( q_elem );
       }
   }

   clear_fd_nonblock( fd );
   return( ret );
}


/*
 * Defined in sys_io.S
 */
ssize_t
sys_io_write(int fd, const void *buf, size_t count);

int
syscall_write( int fd, const void *buf, size_t nbytes )
{
   int ret;

   LOCK_ON

       ret = sys_io_write( fd, buf, nbytes );

   LOCK_OFF

   return( ret );
}

/*--------------------------------------------------------------------------
 * @@          W R I T E
 *--------------------------------------------------------------------------
 * The write() function behaves in all respects like the
 * write(2) POSIX.1 function except that only the calling thread is blocked.
 */
ssize_t 
write( int fd, const void *buf, size_t nbytes )
{
   int err, st, ret, last;
   fd_queue_elem_t *q_elem;

   st = pthread_once( &io_initialized, fd_init );
   if( st != SUCCESS )
       return( FAILURE );

   if( nonblock_is_set( fd ))
   {
       ret = syscall_write( fd, buf, nbytes );
       RETURN( ret );
   }

   while( (ret = syscall_write( fd, buf, nbytes )) < OK ) 
   {
       (void) pthread_get_errno_np( NULL, &err );
       if( err != EAGAIN )
           break;

       pthread_mutex_lock( &fd_server_sem.lock );
       q_elem = fd_enq( fd_wait_write, fd );
       fd_server_sem.flag = TRUE;
       pthread_mutex_unlock( &fd_server_sem.lock );
       pthread_cond_signal( &fd_server_sem.cond );

       pthread_mutex_lock( &(q_elem->sem.lock) );
       while( !(q_elem->sem.flag) ) 
           pthread_cond_wait(&(q_elem->sem.cond), &(q_elem->sem.lock) );

       q_elem->sem.flag = FALSE;
       last = (q_elem->count == 0);
       pthread_mutex_unlock( &(q_elem->sem.lock) );
       if( last )
           free( q_elem );    /* q_elem semaphore used up */
   }

   clear_fd_nonblock( fd );
   return( ret );
}

These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.