From 8d6c6f7e1d607c5d566d10516d9b396d54b51c52 Mon Sep 17 00:00:00 2001 From: Marty Kraimer Date: Fri, 4 Feb 2000 14:43:58 +0000 Subject: [PATCH] latest version of semBinary --- src/libCom/error/errlog.c | 2 +- src/libCom/osi/os/RTEMS/osdSem.c | 4 +- src/libCom/osi/os/default/osdInterrupt.c | 5 +- src/libCom/osi/os/posix/osdSem.c | 36 +++++-- src/libCom/osi/os/vxWorks/osdSem.c | 4 +- src/libCom/osi/osiSem.h | 5 +- src/libCom/test/semBinaryTest.c | 117 ++++++++++++++++------- 7 files changed, 121 insertions(+), 52 deletions(-) diff --git a/src/libCom/error/errlog.c b/src/libCom/error/errlog.c index 33d595fd9..5a083f358 100644 --- a/src/libCom/error/errlog.c +++ b/src/libCom/error/errlog.c @@ -328,6 +328,7 @@ LOCAL void errlogTask(void) while(TRUE) { char *pmessage; + semBinaryMustTake(pvtData.errlogTaskWaitForWork); while((pmessage = msgbufGetSend())) { semMutexMustTake(pvtData.listenerLock); if(pvtData.toConsole) printf("%s",pmessage); @@ -339,7 +340,6 @@ LOCAL void errlogTask(void) semMutexGive(pvtData.listenerLock); msgbufFreeSend(); } - semBinaryMustTake(pvtData.errlogTaskWaitForWork); } } diff --git a/src/libCom/osi/os/RTEMS/osdSem.c b/src/libCom/osi/os/RTEMS/osdSem.c index 394060d5c..1bf82fa73 100644 --- a/src/libCom/osi/os/RTEMS/osdSem.c +++ b/src/libCom/osi/os/RTEMS/osdSem.c @@ -17,7 +17,7 @@ * Create a simple binary semaphore */ semBinaryId -semBinaryCreate(int initialState) +semBinaryCreate(semInitialState initialState) { rtems_status_code sc; rtems_id sid; @@ -50,7 +50,7 @@ semBinaryCreate(int initialState) return (semBinaryId)sid; } -semBinaryId semBinaryMustCreate(int initialState) +semBinaryId semBinaryMustCreate(semInitialState initialState) { semBinaryId id = semBinaryCreate (initialState); assert (id); diff --git a/src/libCom/osi/os/default/osdInterrupt.c b/src/libCom/osi/os/default/osdInterrupt.c index f5d93520a..e86871f96 100644 --- a/src/libCom/osi/os/default/osdInterrupt.c +++ b/src/libCom/osi/os/default/osdInterrupt.c @@ -27,7 +27,10 @@ static int firstTime = 1; epicsShareFunc int epicsShareAPI interruptLock() { - if(firstTime) globalLock = semMutexMustCreate(); + if(firstTime) { + globalLock = semMutexMustCreate(); + firstTime = 0; + } semMutexMustTake(globalLock); return(0); } diff --git a/src/libCom/osi/os/posix/osdSem.c b/src/libCom/osi/os/posix/osdSem.c index 6df5c253c..a55c16a1c 100644 --- a/src/libCom/osi/os/posix/osdSem.c +++ b/src/libCom/osi/os/posix/osdSem.c @@ -26,6 +26,7 @@ of this distribution. typedef struct binary { pthread_mutex_t mutex; pthread_cond_t cond; + int isFull; }binary; typedef struct mutex { @@ -64,7 +65,7 @@ static void convertDoubleToWakeTime(double timeout,struct timespec *wakeTime) } } -semBinaryId semBinaryCreate(int initialState) +semBinaryId semBinaryCreate(semInitialState initialState) { binary *pbinary; int status; @@ -74,11 +75,11 @@ semBinaryId semBinaryCreate(int initialState) checkStatusQuit(status,"pthread_mutex_init","semBinaryCreate"); status = pthread_cond_init(&pbinary->cond,0); checkStatusQuit(status,"pthread_cond_init","semBinaryCreate"); - if(initialState==semFull) semBinaryGive((semBinaryId)pbinary); + if(initialState==semFull) pbinary->isFull = 1; return((semBinaryId)pbinary); } -semBinaryId semBinaryMustCreate(int initialState) +semBinaryId semBinaryMustCreate(semInitialState initialState) { semBinaryId id = semBinaryCreate (initialState); assert (id); @@ -102,8 +103,15 @@ void semBinaryGive(semBinaryId id) binary *pbinary = (binary *)id; int status; - status = pthread_cond_signal(&pbinary->cond); - checkStatus(status,"pthread_cond_signal"); + status = pthread_mutex_lock(&pbinary->mutex); + checkStatusQuit(status,"pthread_mutex_lock","semBinaryTake"); + if(!pbinary->isFull) { + pbinary->isFull = 1; + status = pthread_cond_signal(&pbinary->cond); + checkStatus(status,"pthread_cond_signal"); + } + status = pthread_mutex_unlock(&pbinary->mutex); + checkStatusQuit(status,"pthread_mutex_unlock","semBinaryTake"); } semTakeStatus semBinaryTake(semBinaryId id) @@ -113,8 +121,11 @@ semTakeStatus semBinaryTake(semBinaryId id) status = pthread_mutex_lock(&pbinary->mutex); checkStatusQuit(status,"pthread_mutex_lock","semBinaryTake"); - status = pthread_cond_wait(&pbinary->cond,&pbinary->mutex); - checkStatusQuit(status,"pthread_cond_wait","semBinaryTake"); + if(!pbinary->isFull) { + status = pthread_cond_wait(&pbinary->cond,&pbinary->mutex); + checkStatusQuit(status,"pthread_cond_wait","semBinaryTake"); + } + pbinary->isFull = 0; status = pthread_mutex_unlock(&pbinary->mutex); checkStatusQuit(status,"pthread_mutex_unlock","semBinaryTake"); return(semTakeOK); @@ -124,12 +135,17 @@ semTakeStatus semBinaryTakeTimeout(semBinaryId id, double timeout) { binary *pbinary = (binary *)id; struct timespec wakeTime; - int status,unlockStatus; + int status = 0; + int unlockStatus; - convertDoubleToWakeTime(timeout,&wakeTime); status = pthread_mutex_lock(&pbinary->mutex); checkStatusQuit(status,"pthread_mutex_lock","semBinaryTakeTimeout"); - status = pthread_cond_timedwait(&pbinary->cond,&pbinary->mutex,&wakeTime); + if(!pbinary->isFull) { + convertDoubleToWakeTime(timeout,&wakeTime); + status = pthread_cond_timedwait( + &pbinary->cond,&pbinary->mutex,&wakeTime); + } + if(status==0) pbinary->isFull = 0; unlockStatus = pthread_mutex_unlock(&pbinary->mutex); checkStatusQuit(unlockStatus,"pthread_mutex_unlock","semBinaryTakeTimeout"); if(status==0) return(semTakeOK); diff --git a/src/libCom/osi/os/vxWorks/osdSem.c b/src/libCom/osi/os/vxWorks/osdSem.c index 4b9aba979..1e2bdc852 100644 --- a/src/libCom/osi/os/vxWorks/osdSem.c +++ b/src/libCom/osi/os/vxWorks/osdSem.c @@ -16,12 +16,12 @@ of this distribution. #include "osiSem.h" -semBinaryId semBinaryCreate(int initialState) +semBinaryId semBinaryCreate(semInitialState initialState) { return((semBinaryId)semBCreate(SEM_Q_FIFO,(semEmpty ? SEM_EMPTY : SEM_FULL))); } -semBinaryId semBinaryMustCreate(int initialState) +semBinaryId semBinaryMustCreate(semInitialState initialState) { semBinaryId id = semBinaryCreate (initialState); assert (id); diff --git a/src/libCom/osi/osiSem.h b/src/libCom/osi/osiSem.h index 2639f25e2..d91e375a5 100644 --- a/src/libCom/osi/osiSem.h +++ b/src/libCom/osi/osiSem.h @@ -13,9 +13,10 @@ typedef void *semBinaryId; typedef enum {semTakeOK,semTakeTimeout,semTakeError} semTakeStatus; typedef enum {semEmpty,semFull} semInitialState; -epicsShareFunc semBinaryId epicsShareAPI semBinaryCreate(int initialState); +epicsShareFunc semBinaryId epicsShareAPI semBinaryCreate( + semInitialState initialState); epicsShareFunc semBinaryId epicsShareAPI semBinaryMustCreate ( - int initialState); + semInitialState initialState); epicsShareFunc void epicsShareAPI semBinaryDestroy(semBinaryId id); epicsShareFunc void epicsShareAPI semBinaryGive(semBinaryId id); epicsShareFunc semTakeStatus epicsShareAPI semBinaryTake(semBinaryId id); diff --git a/src/libCom/test/semBinaryTest.c b/src/libCom/test/semBinaryTest.c index 8ce751d34..aa1106554 100644 --- a/src/libCom/test/semBinaryTest.c +++ b/src/libCom/test/semBinaryTest.c @@ -18,49 +18,102 @@ of this distribution. #include "osiThread.h" #include "osiSem.h" +#include "osiRing.h" #include "errlog.h" typedef struct info { - int threadnum; semBinaryId binary; + semMutexId lockRing; int quit; + ringId ring; }info; - -static void binaryThread(void *arg) + +static void consumer(void *arg) { info *pinfo = (info *)arg; time_t tp; - printf("binaryThread %d starting time %ld\n",pinfo->threadnum,time(&tp)); - threadSleep(1.0); + threadId idSelf = threadGetIdSelf(); + + printf("consumer %p starting time %ld\n",idSelf,time(&tp)); while(1) { semTakeStatus status; if(pinfo->quit) { - printf("binaryThread %d returning time %ld\n", - pinfo->threadnum,time(&tp)); - semBinaryGive(pinfo->binary); + printf("consumer %p returning time %ld\n", + idSelf,time(&tp)); return; } status = semBinaryTake(pinfo->binary); if(status!=semTakeOK) { - printf("task %d semBinaryTake returned %d time %ld\n", - pinfo->threadnum,(int)status,time(&tp)); + printf("task %p semBinaryTake returned %d time %ld\n", + idSelf,(int)status,time(&tp)); } - printf("binaryThread %d semBinaryTake time %ld\n", - pinfo->threadnum,time(&tp)); - semBinaryGive(pinfo->binary); - threadSleep(1.0); + while(ringUsedBytes(pinfo->ring)>=2*sizeof(threadId)) { + threadId message[2]; + int nget,i; + + for(i=0; i<2; i++) { + nget = ringGet(pinfo->ring,(void *)&message[i],sizeof(threadId)); + if(nget!=sizeof(threadId)) + printf("consumer error nget %d\n",nget); + } + if(message[0]!=message[1]) { + printf("consumer error message %p %p\n",message[0],message[1]); + } else { + printf("consumer message from %p\n",message[0]); + } + } } } + +static void producer(void *arg) +{ + info *pinfo = (info *)arg; + time_t tp; + threadId idSelf = threadGetIdSelf(); + int ntimes=0; + printf("producer %p starting time %ld\n",idSelf,time(&tp)); + while(1) { + semTakeStatus status; + + ++ntimes; + if(pinfo->quit) { + printf("producer %p returning time %ld\n", + idSelf,time(&tp)); + return; + } + status = semMutexTake(pinfo->lockRing); + if(status!=semTakeOK) { + printf("producer %p semMutexTake returned %d time %ld\n", + idSelf,(int)status,time(&tp)); + } + if(ringFreeBytes(pinfo->ring)>=2*sizeof(int)) { + int nput,i; + + for(i=0; i<2; i++) { + nput = ringPut(pinfo->ring,(void *)&idSelf,sizeof(threadId)); + if(nput!=sizeof(threadId)) + printf("producer %p error nput %d\n",idSelf,nput); + if(i==0 && (ntimes%4==0)) threadSleep(.1); + } + printf("producer %p sending\n",idSelf); + } else { + printf("producer %p ring buffer is full\n",idSelf); + } + semMutexGive(pinfo->lockRing); + threadSleep(1.0); + semBinaryGive(pinfo->binary); + } +} + void semBinaryTest(int nthreads,int verbose) { unsigned int stackSize; threadId *id; - int i; char **name; - void **arg; - info **pinfo; + int i; + info *pinfo; semBinaryId binary; int status; time_t tp; @@ -89,31 +142,27 @@ void semBinaryTest(int nthreads,int verbose) errVerbose = errVerboseSave; return; } + pinfo = calloc(1,sizeof(info)); + pinfo->binary = binary; + pinfo->lockRing = semMutexMustCreate(); + pinfo->ring = ringCreate(1024*2*sizeof(int)); + stackSize = threadGetStackSize(threadStackSmall); + threadCreate("consumer",50,stackSize,consumer,pinfo); id = calloc(nthreads,sizeof(threadId)); name = calloc(nthreads,sizeof(char *)); - arg = calloc(nthreads,sizeof(void *)); - pinfo = calloc(nthreads,sizeof(info *)); - stackSize = threadGetStackSize(threadStackSmall); for(i=0; ithreadnum = i; - pinfo[i]->binary = binary; - arg[i] = pinfo[i]; - id[i] = threadCreate(name[i],40,stackSize,binaryThread,arg[i]); - printf("semTest created binaryThread %d id %p time %ld\n", + sprintf(name[i],"producer%d",i); + id[i] = threadCreate(name[i],40,stackSize,producer,pinfo); + printf("created producer %d id %p time %ld\n", i, id[i],time(&tp)); } - threadSleep(2.0); - printf("semTest calling semBinaryGive(binary) time %ld\n",time(&tp)); - semBinaryGive(binary); threadSleep(5.0); printf("semTest setting quit time %ld\n",time(&tp)); - for(i=0; iquit = 1; - } - semBinaryGive(binary); + pinfo->quit = 1; threadSleep(2.0); + semBinaryGive(pinfo->binary); + threadSleep(1.0); + printf("semTest returning time %ld\n",time(&tp)); errVerbose = errVerboseSave; }