Add support for 0-length message queues.

This commit is contained in:
W. Eric Norum
2003-02-28 16:14:10 +00:00
parent 17624a28c3
commit a1a6e8861e
3 changed files with 60 additions and 26 deletions
+32 -22
View File
@@ -15,13 +15,6 @@
* 630 252 4793
*/
/*
* Machines with 'traditional' memory access semantics could get by without
* the queueMutex for single producer/consumers, but architectures such as
* symmetric-multiprocessing machines with out-of-order writes require
* mutex locks to ensure correct operation. I decided to always use the
* mutex rather having two versions of this code.
*/
#define epicsExportSharedSymbols
#include "epicsMessageQueue.h"
#include <ellLib.h>
@@ -80,18 +73,24 @@ epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
epicsMessageQueueId pmsg;
unsigned int slotBytes, slotLongs;
assert(capacity != 0);
assert(maxMessageSize != 0);
pmsg = (epicsMessageQueueId)callocMustSucceed(1, sizeof(*pmsg), "epicsMessageQueueCreate");
pmsg->capacity = capacity;
pmsg->maxMessageSize = maxMessageSize;
slotLongs = 1 + ((maxMessageSize + sizeof(unsigned long) - 1) / sizeof(unsigned long));
slotBytes = slotLongs * sizeof(unsigned long);
pmsg->buf = (unsigned long *)callocMustSucceed(pmsg->capacity, slotBytes, "epicsMessageQueueCreate");
pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0];
pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs];
if (pmsg->capacity == 0) {
pmsg->buf = NULL;
pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = NULL;
pmsg->lastMessageSlot = NULL;
pmsg->full = true;
}
else {
pmsg->buf = (unsigned long *)callocMustSucceed(pmsg->capacity, slotBytes, "epicsMessageQueueCreate");
pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0];
pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs];
pmsg->full = false;
}
pmsg->slotSize = slotBytes;
pmsg->full = false;
pmsg->mutex = epicsMutexMustCreate();
ellInit(&pmsg->sendQueue);
ellInit(&pmsg->receiveQueue);
@@ -129,18 +128,20 @@ getEventNode(epicsMessageQueueId pmsg)
static int
mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bool haveTimeout, double timeout)
{
char *myInPtr, *myOutPtr, *nextPtr;
char *myInPtr, *nextPtr;
struct threadNode *pthr;
if(size > pmsg->maxMessageSize)
return -1;
epicsMutexLock(pmsg->mutex);
/*
* See if message can be sent
*/
if ((pmsg->numberOfSendersWaiting > 0) || (pmsg->full)) {
epicsMutexLock(pmsg->mutex);
if ((pmsg->numberOfSendersWaiting > 0)
|| (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL))) {
/*
* Return error if not waiting
* Return if not allowed to wait
*/
if (!wait) {
epicsMutexUnlock(pmsg->mutex);
@@ -165,7 +166,7 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bo
ellDelete(&pmsg->sendQueue, &threadNode.link);
pmsg->numberOfSendersWaiting--;
ellAdd(&pmsg->eventFreeList, &threadNode.evp->link);
if (pmsg->full) {
if (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) {
epicsMutexUnlock(pmsg->mutex);
return -1;
}
@@ -225,19 +226,20 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout,
unsigned long l;
struct threadNode *pthr;
epicsMutexLock(pmsg->mutex);
/*
* If there's a message on the queue, copy it
*/
epicsMutexLock(pmsg->mutex);
myOutPtr = (char *)pmsg->outPtr;
if ((myOutPtr != pmsg->inPtr) || pmsg->full) {
if ((pmsg->capacity != 0) && ((myOutPtr != pmsg->inPtr) || pmsg->full)) {
l = *(unsigned long *)myOutPtr;
memcpy(message, (unsigned long *)myOutPtr + 1, l);
if (myOutPtr == pmsg->lastMessageSlot)
pmsg->outPtr = pmsg->firstMessageSlot;
else
pmsg->outPtr += pmsg->slotSize;
pmsg->full = false;
if (pmsg->capacity)
pmsg->full = false;
/*
* Wake up the oldest task waiting to send
@@ -251,13 +253,21 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout,
}
/*
* Return if not waiting
* Return if not allowed to wait
*/
if (!wait) {
epicsMutexUnlock(pmsg->mutex);
return -1;
}
/*
* Wake up the oldest task waiting to send
*/
if ((pthr = (struct threadNode *)ellGet(&pmsg->sendQueue)) != NULL) {
pthr->eventSent = true;
epicsEventSignal(pthr->evp->event);
}
/*
* Wait for message to arrive
*/
@@ -0,0 +1,3 @@
/*
* Nothing needed for default implementation
*/
+25 -4
View File
@@ -40,6 +40,17 @@ randBelow(int n)
return (int)((double)n*rand()/(RAND_MAX+1.0));
}
static void
receiver0(void *arg)
{
epicsMessageQueue *q = (epicsMessageQueue *)arg;
char cbuf[80];
assert(q->receive(cbuf) == 5);
assert(q->receive(cbuf) == 0);
delete q;
}
static void
receiver(void *arg)
{
@@ -200,17 +211,26 @@ epicsMessageQueueTest()
assert (q1->receive((void *)cbuf, 1.0) < 0);
assert(q1->pending() == 0);
/*
* Single receiver, single sender, 0-length queue
*/
epicsMessageQueue *q0 = new epicsMessageQueue(0, 20);
epicsThreadCreate("Receiver zero", epicsThreadPriorityMedium, epicsThreadStackMedium, receiver0, q0);
epicsThreadSleep(1.0);
assert(q0->trySend((void *)msg1, 5) == 0);
epicsThreadSleep(1.0);
assert(q0->trySend((void *)msg1, 0) == 0);
epicsThreadSleep(1.0);
/*
* Single receiver, single sender tests
*/
epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityHigh);
epicsThreadCreate("Receiver one", epicsThreadPriorityMedium, epicsThreadStackMedium, receiver, q1);
for (pass = 1 ; pass <= 3 ; pass++) {
epicsThreadSetPriority(epicsThreadGetIdSelf(), pass == 1 ?
epicsThreadPriorityHigh :
epicsThreadPriorityLow);
switch (pass) {
case 1: printf ("Should send/receive only 4 messages (sender priority > receiver priority).\n"); break;
case 2: printf ("Should send/receive 4 to 10 messages (depends on how host handles thread priorities).\n"); break;
case 2: printf ("Should send/receive 5 to 10 messages (depends on how host handles thread priorities).\n"); break;
case 3: printf ("Should send/receive 10 messages (sender pauses after sending).\n"); break;
}
for (i = 0 ; i < 10 ; i++) {
@@ -220,6 +240,7 @@ epicsMessageQueueTest()
epicsThreadSleep(0.5);
}
printf ("Sent %d messages.\n", i);
epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityLow);
epicsThreadSleep(1.0);
}