diff --git a/src/libCom/osi/os/default/osdMessageQueue.cpp b/src/libCom/osi/os/default/osdMessageQueue.cpp index 789a9b88f..55653b1f8 100644 --- a/src/libCom/osi/os/default/osdMessageQueue.cpp +++ b/src/libCom/osi/os/default/osdMessageQueue.cpp @@ -15,13 +15,6 @@ * 630 252 4793 */ -/* - * Machines with 'traditional' memory access semantics could get by without - * the queueMutex for single producer/consumers, but architectures such as - * symmetric-multiprocessing machines with out-of-order writes require - * mutex locks to ensure correct operation. I decided to always use the - * mutex rather having two versions of this code. - */ #define epicsExportSharedSymbols #include "epicsMessageQueue.h" #include @@ -80,18 +73,24 @@ epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( epicsMessageQueueId pmsg; unsigned int slotBytes, slotLongs; - assert(capacity != 0); - assert(maxMessageSize != 0); pmsg = (epicsMessageQueueId)callocMustSucceed(1, sizeof(*pmsg), "epicsMessageQueueCreate"); pmsg->capacity = capacity; pmsg->maxMessageSize = maxMessageSize; slotLongs = 1 + ((maxMessageSize + sizeof(unsigned long) - 1) / sizeof(unsigned long)); slotBytes = slotLongs * sizeof(unsigned long); - pmsg->buf = (unsigned long *)callocMustSucceed(pmsg->capacity, slotBytes, "epicsMessageQueueCreate"); - pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0]; - pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs]; + if (pmsg->capacity == 0) { + pmsg->buf = NULL; + pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = NULL; + pmsg->lastMessageSlot = NULL; + pmsg->full = true; + } + else { + pmsg->buf = (unsigned long *)callocMustSucceed(pmsg->capacity, slotBytes, "epicsMessageQueueCreate"); + pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0]; + pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs]; + pmsg->full = false; + } pmsg->slotSize = slotBytes; - pmsg->full = false; pmsg->mutex = epicsMutexMustCreate(); ellInit(&pmsg->sendQueue); ellInit(&pmsg->receiveQueue); @@ -129,18 +128,20 @@ getEventNode(epicsMessageQueueId pmsg) static int mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bool haveTimeout, double timeout) { - char *myInPtr, *myOutPtr, *nextPtr; + char *myInPtr, *nextPtr; struct threadNode *pthr; if(size > pmsg->maxMessageSize) return -1; - epicsMutexLock(pmsg->mutex); + /* * See if message can be sent */ - if ((pmsg->numberOfSendersWaiting > 0) || (pmsg->full)) { + epicsMutexLock(pmsg->mutex); + if ((pmsg->numberOfSendersWaiting > 0) + || (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL))) { /* - * Return error if not waiting + * Return if not allowed to wait */ if (!wait) { epicsMutexUnlock(pmsg->mutex); @@ -165,7 +166,7 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bo ellDelete(&pmsg->sendQueue, &threadNode.link); pmsg->numberOfSendersWaiting--; ellAdd(&pmsg->eventFreeList, &threadNode.evp->link); - if (pmsg->full) { + if (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) { epicsMutexUnlock(pmsg->mutex); return -1; } @@ -225,19 +226,20 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, unsigned long l; struct threadNode *pthr; - epicsMutexLock(pmsg->mutex); /* * If there's a message on the queue, copy it */ + epicsMutexLock(pmsg->mutex); myOutPtr = (char *)pmsg->outPtr; - if ((myOutPtr != pmsg->inPtr) || pmsg->full) { + if ((pmsg->capacity != 0) && ((myOutPtr != pmsg->inPtr) || pmsg->full)) { l = *(unsigned long *)myOutPtr; memcpy(message, (unsigned long *)myOutPtr + 1, l); if (myOutPtr == pmsg->lastMessageSlot) pmsg->outPtr = pmsg->firstMessageSlot; else pmsg->outPtr += pmsg->slotSize; - pmsg->full = false; + if (pmsg->capacity) + pmsg->full = false; /* * Wake up the oldest task waiting to send @@ -251,13 +253,21 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, } /* - * Return if not waiting + * Return if not allowed to wait */ if (!wait) { epicsMutexUnlock(pmsg->mutex); return -1; } + /* + * Wake up the oldest task waiting to send + */ + if ((pthr = (struct threadNode *)ellGet(&pmsg->sendQueue)) != NULL) { + pthr->eventSent = true; + epicsEventSignal(pthr->evp->event); + } + /* * Wait for message to arrive */ diff --git a/src/libCom/osi/os/default/osdMessageQueue.h b/src/libCom/osi/os/default/osdMessageQueue.h index e69de29bb..dabf6a71e 100644 --- a/src/libCom/osi/os/default/osdMessageQueue.h +++ b/src/libCom/osi/os/default/osdMessageQueue.h @@ -0,0 +1,3 @@ +/* + * Nothing needed for default implementation + */ diff --git a/src/libCom/test/epicsMessageQueueTest.cpp b/src/libCom/test/epicsMessageQueueTest.cpp index 06aeb01ab..1e69e5a15 100644 --- a/src/libCom/test/epicsMessageQueueTest.cpp +++ b/src/libCom/test/epicsMessageQueueTest.cpp @@ -40,6 +40,17 @@ randBelow(int n) return (int)((double)n*rand()/(RAND_MAX+1.0)); } +static void +receiver0(void *arg) +{ + epicsMessageQueue *q = (epicsMessageQueue *)arg; + char cbuf[80]; + + assert(q->receive(cbuf) == 5); + assert(q->receive(cbuf) == 0); + delete q; +} + static void receiver(void *arg) { @@ -200,17 +211,26 @@ epicsMessageQueueTest() assert (q1->receive((void *)cbuf, 1.0) < 0); assert(q1->pending() == 0); + /* + * Single receiver, single sender, 0-length queue + */ + epicsMessageQueue *q0 = new epicsMessageQueue(0, 20); + epicsThreadCreate("Receiver zero", epicsThreadPriorityMedium, epicsThreadStackMedium, receiver0, q0); + epicsThreadSleep(1.0); + assert(q0->trySend((void *)msg1, 5) == 0); + epicsThreadSleep(1.0); + assert(q0->trySend((void *)msg1, 0) == 0); + epicsThreadSleep(1.0); + /* * Single receiver, single sender tests */ + epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityHigh); epicsThreadCreate("Receiver one", epicsThreadPriorityMedium, epicsThreadStackMedium, receiver, q1); for (pass = 1 ; pass <= 3 ; pass++) { - epicsThreadSetPriority(epicsThreadGetIdSelf(), pass == 1 ? - epicsThreadPriorityHigh : - epicsThreadPriorityLow); switch (pass) { case 1: printf ("Should send/receive only 4 messages (sender priority > receiver priority).\n"); break; - case 2: printf ("Should send/receive 4 to 10 messages (depends on how host handles thread priorities).\n"); break; + case 2: printf ("Should send/receive 5 to 10 messages (depends on how host handles thread priorities).\n"); break; case 3: printf ("Should send/receive 10 messages (sender pauses after sending).\n"); break; } for (i = 0 ; i < 10 ; i++) { @@ -220,6 +240,7 @@ epicsMessageQueueTest() epicsThreadSleep(0.5); } printf ("Sent %d messages.\n", i); + epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityLow); epicsThreadSleep(1.0); }