latest version of semBinary
This commit is contained in:
@@ -328,6 +328,7 @@ LOCAL void errlogTask(void)
|
||||
while(TRUE) {
|
||||
char *pmessage;
|
||||
|
||||
semBinaryMustTake(pvtData.errlogTaskWaitForWork);
|
||||
while((pmessage = msgbufGetSend())) {
|
||||
semMutexMustTake(pvtData.listenerLock);
|
||||
if(pvtData.toConsole) printf("%s",pmessage);
|
||||
@@ -339,7 +340,6 @@ LOCAL void errlogTask(void)
|
||||
semMutexGive(pvtData.listenerLock);
|
||||
msgbufFreeSend();
|
||||
}
|
||||
semBinaryMustTake(pvtData.errlogTaskWaitForWork);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
* Create a simple binary semaphore
|
||||
*/
|
||||
semBinaryId
|
||||
semBinaryCreate(int initialState)
|
||||
semBinaryCreate(semInitialState initialState)
|
||||
{
|
||||
rtems_status_code sc;
|
||||
rtems_id sid;
|
||||
@@ -50,7 +50,7 @@ semBinaryCreate(int initialState)
|
||||
return (semBinaryId)sid;
|
||||
}
|
||||
|
||||
semBinaryId semBinaryMustCreate(int initialState)
|
||||
semBinaryId semBinaryMustCreate(semInitialState initialState)
|
||||
{
|
||||
semBinaryId id = semBinaryCreate (initialState);
|
||||
assert (id);
|
||||
|
||||
@@ -27,7 +27,10 @@ static int firstTime = 1;
|
||||
|
||||
epicsShareFunc int epicsShareAPI interruptLock()
|
||||
{
|
||||
if(firstTime) globalLock = semMutexMustCreate();
|
||||
if(firstTime) {
|
||||
globalLock = semMutexMustCreate();
|
||||
firstTime = 0;
|
||||
}
|
||||
semMutexMustTake(globalLock);
|
||||
return(0);
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ of this distribution.
|
||||
typedef struct binary {
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
int isFull;
|
||||
}binary;
|
||||
|
||||
typedef struct mutex {
|
||||
@@ -64,7 +65,7 @@ static void convertDoubleToWakeTime(double timeout,struct timespec *wakeTime)
|
||||
}
|
||||
}
|
||||
|
||||
semBinaryId semBinaryCreate(int initialState)
|
||||
semBinaryId semBinaryCreate(semInitialState initialState)
|
||||
{
|
||||
binary *pbinary;
|
||||
int status;
|
||||
@@ -74,11 +75,11 @@ semBinaryId semBinaryCreate(int initialState)
|
||||
checkStatusQuit(status,"pthread_mutex_init","semBinaryCreate");
|
||||
status = pthread_cond_init(&pbinary->cond,0);
|
||||
checkStatusQuit(status,"pthread_cond_init","semBinaryCreate");
|
||||
if(initialState==semFull) semBinaryGive((semBinaryId)pbinary);
|
||||
if(initialState==semFull) pbinary->isFull = 1;
|
||||
return((semBinaryId)pbinary);
|
||||
}
|
||||
|
||||
semBinaryId semBinaryMustCreate(int initialState)
|
||||
semBinaryId semBinaryMustCreate(semInitialState initialState)
|
||||
{
|
||||
semBinaryId id = semBinaryCreate (initialState);
|
||||
assert (id);
|
||||
@@ -102,8 +103,15 @@ void semBinaryGive(semBinaryId id)
|
||||
binary *pbinary = (binary *)id;
|
||||
int status;
|
||||
|
||||
status = pthread_cond_signal(&pbinary->cond);
|
||||
checkStatus(status,"pthread_cond_signal");
|
||||
status = pthread_mutex_lock(&pbinary->mutex);
|
||||
checkStatusQuit(status,"pthread_mutex_lock","semBinaryTake");
|
||||
if(!pbinary->isFull) {
|
||||
pbinary->isFull = 1;
|
||||
status = pthread_cond_signal(&pbinary->cond);
|
||||
checkStatus(status,"pthread_cond_signal");
|
||||
}
|
||||
status = pthread_mutex_unlock(&pbinary->mutex);
|
||||
checkStatusQuit(status,"pthread_mutex_unlock","semBinaryTake");
|
||||
}
|
||||
|
||||
semTakeStatus semBinaryTake(semBinaryId id)
|
||||
@@ -113,8 +121,11 @@ semTakeStatus semBinaryTake(semBinaryId id)
|
||||
|
||||
status = pthread_mutex_lock(&pbinary->mutex);
|
||||
checkStatusQuit(status,"pthread_mutex_lock","semBinaryTake");
|
||||
status = pthread_cond_wait(&pbinary->cond,&pbinary->mutex);
|
||||
checkStatusQuit(status,"pthread_cond_wait","semBinaryTake");
|
||||
if(!pbinary->isFull) {
|
||||
status = pthread_cond_wait(&pbinary->cond,&pbinary->mutex);
|
||||
checkStatusQuit(status,"pthread_cond_wait","semBinaryTake");
|
||||
}
|
||||
pbinary->isFull = 0;
|
||||
status = pthread_mutex_unlock(&pbinary->mutex);
|
||||
checkStatusQuit(status,"pthread_mutex_unlock","semBinaryTake");
|
||||
return(semTakeOK);
|
||||
@@ -124,12 +135,17 @@ semTakeStatus semBinaryTakeTimeout(semBinaryId id, double timeout)
|
||||
{
|
||||
binary *pbinary = (binary *)id;
|
||||
struct timespec wakeTime;
|
||||
int status,unlockStatus;
|
||||
int status = 0;
|
||||
int unlockStatus;
|
||||
|
||||
convertDoubleToWakeTime(timeout,&wakeTime);
|
||||
status = pthread_mutex_lock(&pbinary->mutex);
|
||||
checkStatusQuit(status,"pthread_mutex_lock","semBinaryTakeTimeout");
|
||||
status = pthread_cond_timedwait(&pbinary->cond,&pbinary->mutex,&wakeTime);
|
||||
if(!pbinary->isFull) {
|
||||
convertDoubleToWakeTime(timeout,&wakeTime);
|
||||
status = pthread_cond_timedwait(
|
||||
&pbinary->cond,&pbinary->mutex,&wakeTime);
|
||||
}
|
||||
if(status==0) pbinary->isFull = 0;
|
||||
unlockStatus = pthread_mutex_unlock(&pbinary->mutex);
|
||||
checkStatusQuit(unlockStatus,"pthread_mutex_unlock","semBinaryTakeTimeout");
|
||||
if(status==0) return(semTakeOK);
|
||||
|
||||
@@ -16,12 +16,12 @@ of this distribution.
|
||||
|
||||
#include "osiSem.h"
|
||||
|
||||
semBinaryId semBinaryCreate(int initialState)
|
||||
semBinaryId semBinaryCreate(semInitialState initialState)
|
||||
{
|
||||
return((semBinaryId)semBCreate(SEM_Q_FIFO,(semEmpty ? SEM_EMPTY : SEM_FULL)));
|
||||
}
|
||||
|
||||
semBinaryId semBinaryMustCreate(int initialState)
|
||||
semBinaryId semBinaryMustCreate(semInitialState initialState)
|
||||
{
|
||||
semBinaryId id = semBinaryCreate (initialState);
|
||||
assert (id);
|
||||
|
||||
@@ -13,9 +13,10 @@ typedef void *semBinaryId;
|
||||
typedef enum {semTakeOK,semTakeTimeout,semTakeError} semTakeStatus;
|
||||
typedef enum {semEmpty,semFull} semInitialState;
|
||||
|
||||
epicsShareFunc semBinaryId epicsShareAPI semBinaryCreate(int initialState);
|
||||
epicsShareFunc semBinaryId epicsShareAPI semBinaryCreate(
|
||||
semInitialState initialState);
|
||||
epicsShareFunc semBinaryId epicsShareAPI semBinaryMustCreate (
|
||||
int initialState);
|
||||
semInitialState initialState);
|
||||
epicsShareFunc void epicsShareAPI semBinaryDestroy(semBinaryId id);
|
||||
epicsShareFunc void epicsShareAPI semBinaryGive(semBinaryId id);
|
||||
epicsShareFunc semTakeStatus epicsShareAPI semBinaryTake(semBinaryId id);
|
||||
|
||||
@@ -18,49 +18,102 @@ of this distribution.
|
||||
|
||||
#include "osiThread.h"
|
||||
#include "osiSem.h"
|
||||
#include "osiRing.h"
|
||||
#include "errlog.h"
|
||||
|
||||
|
||||
typedef struct info {
|
||||
int threadnum;
|
||||
semBinaryId binary;
|
||||
semMutexId lockRing;
|
||||
int quit;
|
||||
ringId ring;
|
||||
}info;
|
||||
|
||||
static void binaryThread(void *arg)
|
||||
|
||||
static void consumer(void *arg)
|
||||
{
|
||||
info *pinfo = (info *)arg;
|
||||
time_t tp;
|
||||
printf("binaryThread %d starting time %ld\n",pinfo->threadnum,time(&tp));
|
||||
threadSleep(1.0);
|
||||
threadId idSelf = threadGetIdSelf();
|
||||
|
||||
printf("consumer %p starting time %ld\n",idSelf,time(&tp));
|
||||
while(1) {
|
||||
semTakeStatus status;
|
||||
if(pinfo->quit) {
|
||||
printf("binaryThread %d returning time %ld\n",
|
||||
pinfo->threadnum,time(&tp));
|
||||
semBinaryGive(pinfo->binary);
|
||||
printf("consumer %p returning time %ld\n",
|
||||
idSelf,time(&tp));
|
||||
return;
|
||||
}
|
||||
status = semBinaryTake(pinfo->binary);
|
||||
if(status!=semTakeOK) {
|
||||
printf("task %d semBinaryTake returned %d time %ld\n",
|
||||
pinfo->threadnum,(int)status,time(&tp));
|
||||
printf("task %p semBinaryTake returned %d time %ld\n",
|
||||
idSelf,(int)status,time(&tp));
|
||||
}
|
||||
printf("binaryThread %d semBinaryTake time %ld\n",
|
||||
pinfo->threadnum,time(&tp));
|
||||
semBinaryGive(pinfo->binary);
|
||||
threadSleep(1.0);
|
||||
while(ringUsedBytes(pinfo->ring)>=2*sizeof(threadId)) {
|
||||
threadId message[2];
|
||||
int nget,i;
|
||||
|
||||
for(i=0; i<2; i++) {
|
||||
nget = ringGet(pinfo->ring,(void *)&message[i],sizeof(threadId));
|
||||
if(nget!=sizeof(threadId))
|
||||
printf("consumer error nget %d\n",nget);
|
||||
}
|
||||
if(message[0]!=message[1]) {
|
||||
printf("consumer error message %p %p\n",message[0],message[1]);
|
||||
} else {
|
||||
printf("consumer message from %p\n",message[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void producer(void *arg)
|
||||
{
|
||||
info *pinfo = (info *)arg;
|
||||
time_t tp;
|
||||
threadId idSelf = threadGetIdSelf();
|
||||
int ntimes=0;
|
||||
|
||||
printf("producer %p starting time %ld\n",idSelf,time(&tp));
|
||||
while(1) {
|
||||
semTakeStatus status;
|
||||
|
||||
++ntimes;
|
||||
if(pinfo->quit) {
|
||||
printf("producer %p returning time %ld\n",
|
||||
idSelf,time(&tp));
|
||||
return;
|
||||
}
|
||||
status = semMutexTake(pinfo->lockRing);
|
||||
if(status!=semTakeOK) {
|
||||
printf("producer %p semMutexTake returned %d time %ld\n",
|
||||
idSelf,(int)status,time(&tp));
|
||||
}
|
||||
if(ringFreeBytes(pinfo->ring)>=2*sizeof(int)) {
|
||||
int nput,i;
|
||||
|
||||
for(i=0; i<2; i++) {
|
||||
nput = ringPut(pinfo->ring,(void *)&idSelf,sizeof(threadId));
|
||||
if(nput!=sizeof(threadId))
|
||||
printf("producer %p error nput %d\n",idSelf,nput);
|
||||
if(i==0 && (ntimes%4==0)) threadSleep(.1);
|
||||
}
|
||||
printf("producer %p sending\n",idSelf);
|
||||
} else {
|
||||
printf("producer %p ring buffer is full\n",idSelf);
|
||||
}
|
||||
semMutexGive(pinfo->lockRing);
|
||||
threadSleep(1.0);
|
||||
semBinaryGive(pinfo->binary);
|
||||
}
|
||||
}
|
||||
|
||||
void semBinaryTest(int nthreads,int verbose)
|
||||
{
|
||||
unsigned int stackSize;
|
||||
threadId *id;
|
||||
int i;
|
||||
char **name;
|
||||
void **arg;
|
||||
info **pinfo;
|
||||
int i;
|
||||
info *pinfo;
|
||||
semBinaryId binary;
|
||||
int status;
|
||||
time_t tp;
|
||||
@@ -89,31 +142,27 @@ void semBinaryTest(int nthreads,int verbose)
|
||||
errVerbose = errVerboseSave;
|
||||
return;
|
||||
}
|
||||
pinfo = calloc(1,sizeof(info));
|
||||
pinfo->binary = binary;
|
||||
pinfo->lockRing = semMutexMustCreate();
|
||||
pinfo->ring = ringCreate(1024*2*sizeof(int));
|
||||
stackSize = threadGetStackSize(threadStackSmall);
|
||||
threadCreate("consumer",50,stackSize,consumer,pinfo);
|
||||
id = calloc(nthreads,sizeof(threadId));
|
||||
name = calloc(nthreads,sizeof(char *));
|
||||
arg = calloc(nthreads,sizeof(void *));
|
||||
pinfo = calloc(nthreads,sizeof(info *));
|
||||
stackSize = threadGetStackSize(threadStackSmall);
|
||||
for(i=0; i<nthreads; i++) {
|
||||
name[i] = calloc(10,sizeof(char));
|
||||
sprintf(name[i],"task%d",i);
|
||||
pinfo[i] = calloc(1,sizeof(info));
|
||||
pinfo[i]->threadnum = i;
|
||||
pinfo[i]->binary = binary;
|
||||
arg[i] = pinfo[i];
|
||||
id[i] = threadCreate(name[i],40,stackSize,binaryThread,arg[i]);
|
||||
printf("semTest created binaryThread %d id %p time %ld\n",
|
||||
sprintf(name[i],"producer%d",i);
|
||||
id[i] = threadCreate(name[i],40,stackSize,producer,pinfo);
|
||||
printf("created producer %d id %p time %ld\n",
|
||||
i, id[i],time(&tp));
|
||||
}
|
||||
threadSleep(2.0);
|
||||
printf("semTest calling semBinaryGive(binary) time %ld\n",time(&tp));
|
||||
semBinaryGive(binary);
|
||||
threadSleep(5.0);
|
||||
printf("semTest setting quit time %ld\n",time(&tp));
|
||||
for(i=0; i<nthreads; i++) {
|
||||
pinfo[i]->quit = 1;
|
||||
}
|
||||
semBinaryGive(binary);
|
||||
pinfo->quit = 1;
|
||||
threadSleep(2.0);
|
||||
semBinaryGive(pinfo->binary);
|
||||
threadSleep(1.0);
|
||||
printf("semTest returning time %ld\n",time(&tp));
|
||||
errVerbose = errVerboseSave;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user