diff --git a/src/libCom/Makefile b/src/libCom/Makefile index a4c7ecd86..e16723601 100644 --- a/src/libCom/Makefile +++ b/src/libCom/Makefile @@ -28,10 +28,8 @@ SRC_DIRS += $(LIBCOM)/ring #following needed for locating epicsRingPointer.h and epicsRingBytes.h INC += epicsRingPointer.h INC += epicsRingBytes.h -INC += epicsMessageQueue.h SRCS += epicsRingPointer.cpp SRCS += epicsRingBytes.c -SRCS += epicsMessageQueue.cpp SRC_DIRS += $(LIBCOM)/calc #following needed for locating postfixPvt.h and sCalcPostfixPvt.h @@ -151,6 +149,7 @@ INC += osdMutex.h INC += epicsEvent.h INC += osdEvent.h INC += epicsMath.h +INC += osdMessageQueue.h INC += epicsAssert.h INC += epicsFindSymbol.h @@ -166,11 +165,13 @@ INC += osiProcess.h INC += osiUnistd.h INC += osiWireFormat.h INC += epicsReadline.h +INC += epicsMessageQueue.h SRCS += epicsThread.cpp SRCS += epicsMutex.cpp SRCS += epicsEvent.cpp SRCS += epicsTime.cpp +SRCS += epicsMessageQueue.cpp SRCS += osdSock.c SRCS += osiSock.c @@ -190,6 +191,7 @@ SRCS += osdEvent.c SRCS += osdTime.cpp SRCS += osdProcess.c SRCS += osdNetIntf.c +SRCS += osdMessageQueue.c SRC_DIRS += $(LIBCOM)/taskwd INC += taskwd.h diff --git a/src/libCom/osi/epicsMessageQueue.cpp b/src/libCom/osi/epicsMessageQueue.cpp new file mode 100644 index 000000000..6b0c9e34a --- /dev/null +++ b/src/libCom/osi/epicsMessageQueue.cpp @@ -0,0 +1,82 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * Author W. Eric Norum + * norume@aps.anl.gov + * 630 252 4793 + */ + +#include + +#define epicsExportSharedSymbols +#include "epicsMessageQueue.h" + +epicsMessageQueue::epicsMessageQueue(unsigned int aCapacity, + unsigned int aMaxMessageSize) + : id ( epicsMessageQueueCreate(aCapacity, aMaxMessageSize) ) +{ + if (id == NULL) + throw std::bad_alloc (); +} + +epicsMessageQueue::~epicsMessageQueue() +{ + epicsMessageQueueDestroy(id); +} + +int +epicsMessageQueue::trySend(void *message, unsigned int size) +{ + return epicsMessageQueueTrySend(id, message, size); +} + +int +epicsMessageQueue::send(void *message, unsigned int size) +{ + return epicsMessageQueueSend(id, message, size); +} + +int +epicsMessageQueue::send(void *message, unsigned int size, double timeout) +{ + return epicsMessageQueueSendWithTimeout(id, message, size, timeout); +} + +int +epicsMessageQueue::tryReceive(void *message ) +{ + return epicsMessageQueueTryReceive(id, message); +} + +int +epicsMessageQueue::receive(void *message ) +{ + return epicsMessageQueueReceive(id, message); +} + +int +epicsMessageQueue::receive(void *message, double timeout) +{ + return epicsMessageQueueReceiveWithTimeout(id, message, timeout); +} + +unsigned int +epicsMessageQueue::pending() +{ + return epicsMessageQueuePending(id); +} + +void +epicsMessageQueue::show(unsigned int level) +{ + epicsMessageQueueShow(id, level); +} diff --git a/src/libCom/ring/epicsMessageQueue.h b/src/libCom/osi/epicsMessageQueue.h similarity index 74% rename from src/libCom/ring/epicsMessageQueue.h rename to src/libCom/osi/epicsMessageQueue.h index cddeb5742..0c766a7b7 100644 --- a/src/libCom/ring/epicsMessageQueue.h +++ b/src/libCom/osi/epicsMessageQueue.h @@ -22,23 +22,27 @@ #define epicsMessageQueueh #include "epicsAssert.h" -#include "epicsEvent.h" -#include "epicsMutex.h" #include "shareLib.h" +typedef struct epicsMessageQueueOSD *epicsMessageQueueId; + #ifdef __cplusplus +#include "locationException.h" + class epicsShareClass epicsMessageQueue { public: epicsMessageQueue ( unsigned int capacity, unsigned int maximumMessageSize ); ~epicsMessageQueue (); - bool send ( void *message, unsigned int messageSize ); - bool send ( void *message, unsigned int messageSize, double timeout ); + int trySend ( void *message, unsigned int messageSize ); + int send ( void *message, unsigned int messageSize); + int send ( void *message, unsigned int messageSize, double timeout ); + int tryReceive ( void *message ); int receive ( void *message ); int receive ( void *message, double timeout ); - void show ( unsigned int level = 0 ) const; - unsigned int pending () const; + void show ( unsigned int level = 0 ); + unsigned int pending (); private: // Prevent compiler-generated member functions // default constructor, copy constructor, assignment operator @@ -46,32 +50,21 @@ private: // Prevent compiler-generated member functions epicsMessageQueue(const epicsMessageQueue &); epicsMessageQueue& operator=(const epicsMessageQueue &); -private: - int receive ( void *message, bool withTimeout, double timeout ); - - volatile char *inPtr; - volatile char *outPtr; - volatile bool full; - unsigned int capacity; - unsigned int maxMessageSize; - unsigned int slotSize; - unsigned long *buf; - char *firstMessageSlot; - char *lastMessageSlot; - epicsEvent queueEvent; - epicsMutex queueMutex; + epicsMessageQueueId id; }; extern "C" { #endif /*__cplusplus */ -typedef void *epicsMessageQueueId; - epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( unsigned int capacity, unsigned int maximumMessageSize); epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy( epicsMessageQueueId id); +epicsShareFunc int epicsShareAPI epicsMessageQueueTrySend( + epicsMessageQueueId id, + void *message, + unsigned int messageSize); epicsShareFunc int epicsShareAPI epicsMessageQueueSend( epicsMessageQueueId id, void *message, @@ -81,23 +74,26 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueSendWithTimeout( void *message, unsigned int messageSize, double timeout); +epicsShareFunc int epicsShareAPI epicsMessageQueueTryReceive( + epicsMessageQueueId id, + void *message); epicsShareFunc int epicsShareAPI epicsMessageQueueReceive( epicsMessageQueueId id, - void *message, - unsigned int *messageSize); + void *message); epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( epicsMessageQueueId id, void *message, - unsigned int *messageSize, double timeout); epicsShareFunc int epicsShareAPI epicsMessageQueuePending( epicsMessageQueueId id); epicsShareFunc void epicsShareAPI epicsMessageQueueShow( epicsMessageQueueId id, - unsigned int level); + int level); #ifdef __cplusplus } #endif /*__cplusplus */ +#include "osdMessageQueue.h" + #endif /* epicsMessageQueueh */ diff --git a/src/libCom/osi/os/default/osdMessageQueue.cpp b/src/libCom/osi/os/default/osdMessageQueue.cpp new file mode 100644 index 000000000..789a9b88f --- /dev/null +++ b/src/libCom/osi/os/default/osdMessageQueue.cpp @@ -0,0 +1,328 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * Author W. Eric Norum + * norume@aps.anl.gov + * 630 252 4793 + */ + +/* + * Machines with 'traditional' memory access semantics could get by without + * the queueMutex for single producer/consumers, but architectures such as + * symmetric-multiprocessing machines with out-of-order writes require + * mutex locks to ensure correct operation. I decided to always use the + * mutex rather having two versions of this code. + */ +#define epicsExportSharedSymbols +#include "epicsMessageQueue.h" +#include +#include +#include +#include +#include +# include + +/* + * Event cache + */ +struct eventNode { + ELLNODE link; + epicsEventId event; +}; + +/* + * List of threads waiting to send or receive a message + */ +struct threadNode { + ELLNODE link; + struct eventNode *evp; + void *buf; + unsigned int size; + volatile bool eventSent; +}; + +/* + * Message info + */ +struct epicsMessageQueueOSD { + ELLLIST sendQueue; + ELLLIST receiveQueue; + ELLLIST eventFreeList; + int numberOfSendersWaiting; + + epicsMutexId mutex; + unsigned long capacity; + unsigned long maxMessageSize; + + unsigned long *buf; + char *firstMessageSlot; + char *lastMessageSlot; + volatile char *inPtr; + volatile char *outPtr; + unsigned long slotSize; + + bool full; +}; + +epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( + unsigned int capacity, + unsigned int maxMessageSize) +{ + epicsMessageQueueId pmsg; + unsigned int slotBytes, slotLongs; + + assert(capacity != 0); + assert(maxMessageSize != 0); + pmsg = (epicsMessageQueueId)callocMustSucceed(1, sizeof(*pmsg), "epicsMessageQueueCreate"); + pmsg->capacity = capacity; + pmsg->maxMessageSize = maxMessageSize; + slotLongs = 1 + ((maxMessageSize + sizeof(unsigned long) - 1) / sizeof(unsigned long)); + slotBytes = slotLongs * sizeof(unsigned long); + pmsg->buf = (unsigned long *)callocMustSucceed(pmsg->capacity, slotBytes, "epicsMessageQueueCreate"); + pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0]; + pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs]; + pmsg->slotSize = slotBytes; + pmsg->full = false; + pmsg->mutex = epicsMutexMustCreate(); + ellInit(&pmsg->sendQueue); + ellInit(&pmsg->receiveQueue); + ellInit(&pmsg->eventFreeList); + return pmsg; +} + +epicsShareFunc void epicsShareAPI +epicsMessageQueueDestroy(epicsMessageQueueId pmsg) +{ + struct eventNode *evp; + + while ((evp = (struct eventNode *)ellGet(&pmsg->eventFreeList)) != NULL) { + epicsEventDestroy(evp->event); + free(evp); + } + epicsMutexDestroy(pmsg->mutex); + free(pmsg->buf); + free(pmsg); +} + +static struct eventNode * +getEventNode(epicsMessageQueueId pmsg) +{ + struct eventNode *evp; + + evp = (struct eventNode *)ellGet(&pmsg->eventFreeList); + if (evp == NULL) { + evp = (struct eventNode *)callocMustSucceed(1, sizeof(*evp), "epicsMessageQueueGetEventNode"); + evp->event = epicsEventMustCreate(epicsEventEmpty); + } + return evp; +} + +static int +mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bool haveTimeout, double timeout) +{ + char *myInPtr, *myOutPtr, *nextPtr; + struct threadNode *pthr; + + if(size > pmsg->maxMessageSize) + return -1; + epicsMutexLock(pmsg->mutex); + /* + * See if message can be sent + */ + if ((pmsg->numberOfSendersWaiting > 0) || (pmsg->full)) { + /* + * Return error if not waiting + */ + if (!wait) { + epicsMutexUnlock(pmsg->mutex); + return -1; + } + + /* + * Wait + */ + struct threadNode threadNode; + threadNode.evp = getEventNode(pmsg); + threadNode.eventSent = false; + ellAdd(&pmsg->sendQueue, &threadNode.link); + pmsg->numberOfSendersWaiting++; + epicsMutexUnlock(pmsg->mutex); + if(haveTimeout) + epicsEventWaitWithTimeout(threadNode.evp->event, timeout); + else + epicsEventWait(threadNode.evp->event); + epicsMutexLock(pmsg->mutex); + if(!threadNode.eventSent) + ellDelete(&pmsg->sendQueue, &threadNode.link); + pmsg->numberOfSendersWaiting--; + ellAdd(&pmsg->eventFreeList, &threadNode.evp->link); + if (pmsg->full) { + epicsMutexUnlock(pmsg->mutex); + return -1; + } + } + + /* + * Copy message to waiting receiver + */ + if ((pthr = (struct threadNode *)ellGet(&pmsg->receiveQueue)) != NULL) { + memcpy(pthr->buf, message, size); + pthr->size = size; + pthr->eventSent = true; + epicsEventSignal(pthr->evp->event); + epicsMutexUnlock(pmsg->mutex); + return 0; + } + + /* + * Copy to queue + */ + myInPtr = (char *)pmsg->inPtr; + if (myInPtr == pmsg->lastMessageSlot) + nextPtr = pmsg->firstMessageSlot; + else + nextPtr = myInPtr + pmsg->slotSize; + if (nextPtr == (char *)pmsg->outPtr) + pmsg->full = true; + *(volatile unsigned long *)myInPtr = size; + memcpy((unsigned long *)myInPtr + 1, message, size); + pmsg->inPtr = nextPtr; + epicsMutexUnlock(pmsg->mutex); + return 0; +} + +epicsShareFunc int epicsShareAPI +epicsMessageQueueTrySend(epicsMessageQueueId pmsg, void *message, unsigned int size) +{ + return mySend(pmsg, message, size, false, false, 0.0); +} + +epicsShareFunc int epicsShareAPI +epicsMessageQueueSend(epicsMessageQueueId pmsg, void *message, unsigned int size) +{ + return mySend(pmsg, message, size, true, false, 0.0); +} + +epicsShareFunc int epicsShareAPI +epicsMessageQueueSendWithTimeout(epicsMessageQueueId pmsg, void *message, unsigned int size, double timeout) +{ + return mySend(pmsg, message, size, true, true, timeout); +} + +static int +myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, double timeout) +{ + char *myOutPtr; + unsigned long l; + struct threadNode *pthr; + + epicsMutexLock(pmsg->mutex); + /* + * If there's a message on the queue, copy it + */ + myOutPtr = (char *)pmsg->outPtr; + if ((myOutPtr != pmsg->inPtr) || pmsg->full) { + l = *(unsigned long *)myOutPtr; + memcpy(message, (unsigned long *)myOutPtr + 1, l); + if (myOutPtr == pmsg->lastMessageSlot) + pmsg->outPtr = pmsg->firstMessageSlot; + else + pmsg->outPtr += pmsg->slotSize; + pmsg->full = false; + + /* + * Wake up the oldest task waiting to send + */ + if ((pthr = (struct threadNode *)ellGet(&pmsg->sendQueue)) != NULL) { + pthr->eventSent = true; + epicsEventSignal(pthr->evp->event); + } + epicsMutexUnlock(pmsg->mutex); + return l; + } + + /* + * Return if not waiting + */ + if (!wait) { + epicsMutexUnlock(pmsg->mutex); + return -1; + } + + /* + * Wait for message to arrive + */ + struct threadNode threadNode; + threadNode.evp = getEventNode(pmsg); + threadNode.buf = message; + threadNode.eventSent = false; + ellAdd(&pmsg->receiveQueue, &threadNode.link); + epicsMutexUnlock(pmsg->mutex); + if(haveTimeout) + epicsEventWaitWithTimeout(threadNode.evp->event, timeout); + else + epicsEventWait(threadNode.evp->event); + epicsMutexLock(pmsg->mutex); + if(!threadNode.eventSent) + ellDelete(&pmsg->receiveQueue, &threadNode.link); + ellAdd(&pmsg->eventFreeList, &threadNode.evp->link); + epicsMutexUnlock(pmsg->mutex); + if(threadNode.eventSent) + return threadNode.size; + return -1; +} + +epicsShareFunc int epicsShareAPI +epicsMessageQueueTryReceive(epicsMessageQueueId pmsg, void *message) +{ + return myReceive(pmsg, message, false, false, 0.0); +} + +epicsShareFunc int epicsShareAPI +epicsMessageQueueReceive(epicsMessageQueueId pmsg, void *message) +{ + return myReceive(pmsg, message, true, false, 0.0); +} + +epicsShareFunc int epicsShareAPI +epicsMessageQueueReceiveWithTimeout(epicsMessageQueueId pmsg, void *message, double timeout) +{ + return myReceive(pmsg, message, true, true, timeout); +} + +epicsShareFunc int epicsShareAPI +epicsMessageQueuePending(epicsMessageQueueId pmsg) +{ + char *myInPtr, *myOutPtr; + int nmsg; + + epicsMutexLock(pmsg->mutex); + myInPtr = (char *)pmsg->inPtr; + myOutPtr = (char *)pmsg->outPtr; + if (pmsg->full) + nmsg = pmsg->capacity; + else if (myInPtr >= myOutPtr) + nmsg = (myInPtr - myOutPtr) / pmsg->slotSize; + else + nmsg = pmsg->capacity - (myOutPtr - myInPtr) / pmsg->slotSize; + epicsMutexUnlock(pmsg->mutex); + return nmsg; +} + +epicsShareFunc void epicsShareAPI +epicsMessageQueueShow(epicsMessageQueueId pmsg, int level) +{ + printf("Message Queue Used:%d Slots:%lu", epicsMessageQueuePending(pmsg), pmsg->capacity); + if (level >= 1) + printf(" Maximum size:%lu", pmsg->maxMessageSize); + printf("\n"); +} diff --git a/src/libCom/osi/os/default/osdMessageQueue.h b/src/libCom/osi/os/default/osdMessageQueue.h new file mode 100644 index 000000000..e69de29bb diff --git a/src/libCom/ring/epicsMessageQueue.cpp b/src/libCom/ring/epicsMessageQueue.cpp deleted file mode 100644 index 8b9af1e68..000000000 --- a/src/libCom/ring/epicsMessageQueue.cpp +++ /dev/null @@ -1,218 +0,0 @@ -/*************************************************************************\ -* Copyright (c) 2002 The University of Chicago, as Operator of Argonne -* National Laboratory. -* Copyright (c) 2002 The Regents of the University of California, as -* Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. -\*************************************************************************/ -/* - * $Id$ - * - * Author W. Eric Norum - * norume@aps.anl.gov - * 630 252 4793 - */ - -/* - * Interthread message passing - * - * Machines with 'traditional' memory access semantics could get by without - * the queueMutex for single producer/consumers, but architectures, such as - * symmetric-multiprocessing machines with out-of-order writes, require - * mutex locks to ensure correct operation. I decided to always use the - * mutex rather having two versions of this code. - */ -#include -#include -#define epicsExportSharedSymbols -#include -#include "epicsMessageQueue.h" - -epicsMessageQueue::epicsMessageQueue(unsigned int aCapacity, - unsigned int aMaxMessageSize) -{ - assert(aCapacity != 0); - assert(aMaxMessageSize != 0); - capacity = aCapacity; - maxMessageSize = aMaxMessageSize; - slotSize = (sizeof(unsigned long) - + maxMessageSize + sizeof(unsigned long) - 1) - / sizeof(unsigned long); - buf = new unsigned long[capacity * slotSize]; - inPtr = outPtr = firstMessageSlot = (char *)&buf[0]; - lastMessageSlot = (char *)&buf[(capacity - 1) * slotSize]; - slotSize *= sizeof(unsigned long); - full = false; -} - -epicsMessageQueue::~epicsMessageQueue() -{ - delete buf; -} - -bool -epicsMessageQueue::send(void *message, unsigned int size) -{ - char *myInPtr, *myOutPtr, *nextPtr; - - assert(size <= maxMessageSize); - queueMutex.lock(); - if (full) { - queueMutex.unlock(); - return false; - } - myInPtr = (char *)inPtr; - myOutPtr = (char *)outPtr; - if (myInPtr == lastMessageSlot) - nextPtr = firstMessageSlot; - else - nextPtr = myInPtr + slotSize; - if (nextPtr == myOutPtr) - full = true; - *(volatile unsigned long *)myInPtr = size; - memcpy((unsigned long *)myInPtr + 1, message, size); - this->inPtr = nextPtr; - queueMutex.unlock(); - queueEvent.signal(); - return true; -} - -int -epicsMessageQueue::receive(void *message, bool withTimeout, double timeout) -{ - char *myOutPtr; - unsigned long l; - - queueMutex.lock(); - myOutPtr = (char *)outPtr; - while ((myOutPtr == inPtr) && !full) { - queueMutex.unlock(); - if (withTimeout) { - if (queueEvent.wait(timeout) == false) - return -1; - } - else { - queueEvent.wait(); - } - queueMutex.lock(); - myOutPtr = (char *)outPtr; - } - l = *(unsigned long *)myOutPtr; - memcpy(message, (unsigned long *)myOutPtr + 1, l); - if (myOutPtr == lastMessageSlot) - myOutPtr = firstMessageSlot; - else - myOutPtr = myOutPtr + slotSize; - this->outPtr = myOutPtr; - full = false; - queueMutex.unlock(); - return l; -} - -int -epicsMessageQueue::receive(void *message ) -{ - return this->receive(message, false, 0.0); -} - -int -epicsMessageQueue::receive(void *message, double timeout) -{ - return this->receive(message, true, timeout); -} - -unsigned int -epicsMessageQueue::pending() const -{ - char *myInPtr, *myOutPtr; - int nmsg; - - myInPtr = (char *)inPtr; - myOutPtr = (char *)outPtr; - if (myInPtr >= myOutPtr) - nmsg = (myInPtr - myOutPtr) / slotSize; - else - nmsg = capacity - (myOutPtr - myInPtr) / slotSize; - if (full) - nmsg = capacity; - return nmsg; -} - -void -epicsMessageQueue::show(unsigned int level) const -{ - char *myInPtr, *myOutPtr; - int nmsg; - - myInPtr = (char *)inPtr; - myOutPtr = (char *)outPtr; - if (myInPtr >= myOutPtr) - nmsg = (myInPtr - myOutPtr) / slotSize; - else - nmsg = capacity - (myOutPtr - myInPtr) / slotSize; - if (full) - nmsg = capacity; - printf("Message Queue Used:%d Slots:%d", nmsg, capacity); - if (level >= 1) - printf(" Maximum size:%u", maxMessageSize); - printf("\n"); -} - -epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( - unsigned int capacity, - unsigned int maxMessageSize) -{ - epicsMessageQueue *qid; - - try { - qid = new epicsMessageQueue(capacity, maxMessageSize); - } - catch (...) { - return NULL; - } - return qid; -} - -epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy( - epicsMessageQueueId id) -{ - delete (epicsMessageQueue *)id; -} - -epicsShareFunc int epicsShareAPI epicsMessageQueueSend( - epicsMessageQueueId id, - void *message, - unsigned int messageSize) -{ - return ((epicsMessageQueue *)id)->send(message, messageSize); -} - -epicsShareFunc int epicsShareAPI epicsMessageQueueReceive( - epicsMessageQueueId id, - void *message) -{ - return ((epicsMessageQueue *)id)->receive(message); -} - -epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( - epicsMessageQueueId id, - void *message, - double timeout) -{ - return ((epicsMessageQueue *)id)->receive(message, timeout); -} - -epicsShareFunc int epicsShareAPI epicsMessageQueuePending( - epicsMessageQueueId id) -{ - return ((epicsMessageQueue *)id)->pending(); -} - -epicsShareFunc void epicsShareAPI epicsMessageQueueShow( - epicsMessageQueueId id, - unsigned int level) -{ - ((epicsMessageQueue *)id)->show(level); -} diff --git a/src/libCom/test/epicsMessageQueueTest.cpp b/src/libCom/test/epicsMessageQueueTest.cpp index af900a4ca..06aeb01ab 100644 --- a/src/libCom/test/epicsMessageQueueTest.cpp +++ b/src/libCom/test/epicsMessageQueueTest.cpp @@ -23,6 +23,23 @@ const char *msg1 = "1234567890This is a very long message."; +/* + * In Numerical Recipes in C: The Art of Scientific Computing (William H. + * Press, Brian P. Flannery, Saul A. Teukolsky, William T. Vetterling; New + * York: Cambridge University Press, 1992 (2nd ed., p. 277)), the follow- + * ing comments are made: + * "If you want to generate a random integer between 1 and 10, you + * should always do it by using high-order bits, as in + * j=1+(int) (10.0*rand()/(RAND_MAX+1.0)); + * and never by anything resembling + * j=1+(rand() % 10); + */ +static int +randBelow(int n) +{ + return (int)((double)n*rand()/(RAND_MAX+1.0)); +} + static void receiver(void *arg) { @@ -43,7 +60,7 @@ receiver(void *arg) if (expectmsg[sender-1] != msgNum) printf("%s received %d '%.*s' -- expected %d\n", epicsThreadGetNameSelf(), len, len, cbuf, expectmsg[sender-1]); expectmsg[sender-1] = msgNum + 1; - epicsThreadSleep(0.001 * (rand() % 20)); + epicsThreadSleep(0.001 * (randBelow(20))); } else { printf("%s received %d '%.*s'\n", epicsThreadGetNameSelf(), len, len, cbuf); @@ -65,9 +82,9 @@ sender(void *arg) for (;;) { len = sprintf(cbuf, "%s -- %d.", epicsThreadGetNameSelf(), ++i); - while (q->send((void *)cbuf, len) == false) - epicsThreadSleep(0.005 * (rand() % 5)); - epicsThreadSleep(0.005 * (rand() % 20)); + while (q->trySend((void *)cbuf, len) < 0) + epicsThreadSleep(0.005 * (randBelow(5))); + epicsThreadSleep(0.005 * (randBelow(20))); } } @@ -89,7 +106,7 @@ epicsMessageQueueTest() i = 0; used = 0; assert(q1->pending() == 0); - while (q1->send((void *)msg1, i ) == true) { + while (q1->trySend((void *)msg1, i ) == 0) { i++; assert(q1->pending() == i); printf("Should have %d used -- ", ++used); @@ -108,14 +125,14 @@ epicsMessageQueueTest() assert(q1->pending() == 2); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); - q1->send((void *)msg1, i++); + q1->trySend((void *)msg1, i++); want++; len = q1->receive(cbuf); assert(q1->pending() == 2); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); - q1->send((void *)msg1, i++); + q1->trySend((void *)msg1, i++); assert(q1->pending() == 3); i = 3; @@ -125,7 +142,62 @@ epicsMessageQueueTest() if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); } - printf("len:%d i:%d pending:%d\n", len, i, q1->pending()); + assert(q1->pending() == 0); + + /* + * Sender timeout + */ + i = 0; + used = 0; + assert(q1->pending() == 0); + while (q1->send((void *)msg1, i, 1.0 ) == 0) { + i++; + assert(q1->pending() == i); + printf("Should have %d used -- ", ++used); + q1->show(); + } + assert(q1->pending() == 4); + + want = 0; + len = q1->receive(cbuf); + assert(q1->pending() == 3); + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + + want++; + len = q1->receive(cbuf); + assert(q1->pending() == 2); + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + q1->send((void *)msg1, i++, 1.0); + + want++; + len = q1->receive(cbuf); + assert(q1->pending() == 2); + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + q1->send((void *)msg1, i++, 1.0); + assert(q1->pending() == 3); + + i = 3; + while ((len = q1->receive(cbuf, 1.0)) >= 0) { + assert(q1->pending() == --i); + want++; + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + } + assert(q1->pending() == 0); + + /* + * Receiver with timeout + */ + for (i = 0 ; i < 4 ; i++) + assert (q1->send((void *)msg1, i, 1.0) == 0); + assert(q1->pending() == 4); + for (i = 0 ; i < 4 ; i++) + assert (q1->receive((void *)cbuf, 1.0) == (int)i); + assert(q1->pending() == 0); + assert (q1->receive((void *)cbuf, 1.0) < 0); assert(q1->pending() == 0); /* @@ -142,7 +214,7 @@ epicsMessageQueueTest() case 3: printf ("Should send/receive 10 messages (sender pauses after sending).\n"); break; } for (i = 0 ; i < 10 ; i++) { - if (q1->send((void *)msg1, i) == false) + if (q1->trySend((void *)msg1, i) < 0) break; if (pass >= 3) epicsThreadSleep(0.5); @@ -154,7 +226,7 @@ epicsMessageQueueTest() /* * Single receiver, multiple sender tests */ - printf("This test takes another 5 minutes to finish.\n"); + printf("This test takes 5 minutes to run.\n"); printf("Test has succeeded if nothing appears between here....\n"); epicsThreadCreate("Sender 1", epicsThreadPriorityLow, epicsThreadStackMedium, sender, q1); epicsThreadCreate("Sender 2", epicsThreadPriorityMedium, epicsThreadStackMedium, sender, q1); @@ -167,6 +239,6 @@ epicsMessageQueueTest() * Force out summaries */ printf("......and here.\n"); - q1->send((void *)msg1, 0); + q1->trySend((void *)msg1, 0); epicsThreadSleep(1.0); }