diff --git a/src/libCom/osi/os/default/osdMessageQueue.cpp b/src/libCom/osi/os/default/osdMessageQueue.cpp index fc50a36b7..d99e23b70 100644 --- a/src/libCom/osi/os/default/osdMessageQueue.cpp +++ b/src/libCom/osi/os/default/osdMessageQueue.cpp @@ -106,7 +106,7 @@ epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( } static void -freeEventNode(struct eventNode *enode) +destroyEventNode(struct eventNode *enode) { epicsEventDestroy(enode->event); free(enode); @@ -119,7 +119,7 @@ epicsMessageQueueDestroy(epicsMessageQueueId pmsg) while ((evp = reinterpret_cast < struct eventNode * > ( ellGet(&pmsg->eventFreeList) ) ) != NULL) { - freeEventNode(evp); + destroyEventNode(evp); } epicsMutexDestroy(pmsg->mutex); free(pmsg->buf); @@ -145,6 +145,16 @@ getEventNode(epicsMessageQueueId pmsg) return evp; } +static void +freeEventNode(epicsMessageQueueId pmsg, eventNode *evp, epicsEventStatus status) +{ + if (status == epicsEventWaitTimeout) { + epicsEventSignal(evp->event); + epicsEventWait(evp->event); + } + ellAdd(&pmsg->eventFreeList, &evp->link); +} + static int mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, double timeout) @@ -163,7 +173,7 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, if ((pmsg->numberOfSendersWaiting > 0) || (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL))) { /* - * Return if not allowed to wait + * Return if not allowed to wait. NB -1 means wait forever. */ if (timeout == 0) { epicsMutexUnlock(pmsg->mutex); @@ -171,7 +181,7 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, } /* - * Wait + * Indicate that we're waiting */ struct threadNode threadNode; threadNode.evp = getEventNode(pmsg); @@ -186,22 +196,25 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, epicsMutexUnlock(pmsg->mutex); - epicsEventStatus status; - if (timeout > 0) - status = epicsEventWaitWithTimeout(threadNode.evp->event, timeout); - else - status = epicsEventWait(threadNode.evp->event); + /* + * Wait for receiver to wake us + */ + epicsEventStatus status = timeout < 0 ? + epicsEventWait(threadNode.evp->event) : + epicsEventWaitWithTimeout(threadNode.evp->event, timeout); epicsMutexMustLock(pmsg->mutex); - if(!threadNode.eventSent) + if (!threadNode.eventSent) { + /* Receiver didn't take us off the sendQueue, do it ourselves */ ellDelete(&pmsg->sendQueue, &threadNode.link); - pmsg->numberOfSendersWaiting--; + pmsg->numberOfSendersWaiting--; + } - ellAdd(&pmsg->eventFreeList, &threadNode.evp->link); + freeEventNode(pmsg, threadNode.evp, status); - if ((pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) || - status != epicsEventOK) { + if (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) { + /* State of the queue didn't change, exit */ epicsMutexUnlock(pmsg->mutex); return -1; } @@ -294,6 +307,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size, */ if ((pthr = reinterpret_cast < struct threadNode * > ( ellGet(&pmsg->sendQueue) ) ) != NULL) { + pmsg->numberOfSendersWaiting--; pthr->eventSent = true; epicsEventSignal(pthr->evp->event); } @@ -302,7 +316,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size, } /* - * Return if not allowed to wait + * Return if not allowed to wait. NB -1 means wait forever. */ if (timeout == 0) { epicsMutexUnlock(pmsg->mutex); @@ -310,16 +324,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size, } /* - * Wake up the oldest task waiting to send - */ - if ((pthr = reinterpret_cast < struct threadNode * > - ( ellGet(&pmsg->sendQueue) ) ) != NULL) { - pthr->eventSent = true; - epicsEventSignal(pthr->evp->event); - } - - /* - * Wait for message to arrive + * Indicate that we're waiting */ struct threadNode threadNode; threadNode.evp = getEventNode(pmsg); @@ -333,18 +338,32 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size, } ellAdd(&pmsg->receiveQueue, &threadNode.link); + + /* + * Wake up the oldest task waiting to send + */ + if ((pthr = reinterpret_cast < struct threadNode * > + ( ellGet(&pmsg->sendQueue) ) ) != NULL) { + pmsg->numberOfSendersWaiting--; + pthr->eventSent = true; + epicsEventSignal(pthr->evp->event); + } + epicsMutexUnlock(pmsg->mutex); - if (timeout > 0) + /* + * Wait for a message to arrive + */ + epicsEventStatus status = timeout < 0 ? + epicsEventWait(threadNode.evp->event) : epicsEventWaitWithTimeout(threadNode.evp->event, timeout); - else - epicsEventWait(threadNode.evp->event); epicsMutexMustLock(pmsg->mutex); if (!threadNode.eventSent) ellDelete(&pmsg->receiveQueue, &threadNode.link); - ellAdd(&pmsg->eventFreeList, &threadNode.evp->link); + + freeEventNode(pmsg, threadNode.evp, status); epicsMutexUnlock(pmsg->mutex); @@ -396,7 +415,8 @@ epicsMessageQueuePending(epicsMessageQueueId pmsg) epicsShareFunc void epicsShareAPI epicsMessageQueueShow(epicsMessageQueueId pmsg, int level) { - printf("Message Queue Used:%d Slots:%lu", epicsMessageQueuePending(pmsg), pmsg->capacity); + printf("Message Queue Used:%d Slots:%lu", + epicsMessageQueuePending(pmsg), pmsg->capacity); if (level >= 1) printf(" Maximum size:%lu", pmsg->maxMessageSize); printf("\n"); diff --git a/src/libCom/test/epicsMessageQueueTest.cpp b/src/libCom/test/epicsMessageQueueTest.cpp index 1bee13c36..f23683c84 100644 --- a/src/libCom/test/epicsMessageQueueTest.cpp +++ b/src/libCom/test/epicsMessageQueueTest.cpp @@ -27,7 +27,10 @@ static volatile int sendExit = 0; static volatile int recvExit = 0; static epicsEventId finished; static unsigned int mediumStack; -static int numReceived; + +#define SLEEPY_TESTS 500 +static int numSent, numReceived; +static epicsEventId complete; /* * In Numerical Recipes in C: The Art of Scientific Computing (William H. @@ -124,11 +127,96 @@ fastReceiver(void *arg) int len; numReceived = 0; while (!recvExit) { - len = q->receive(cbuf, sizeof cbuf, 0.01); + len = q->receive(cbuf, sizeof cbuf, 0.010); if (len > 0) { numReceived++; } } + recvExit = 0; + epicsEventSignal(complete); +} + +void sleepySender(double delay) +{ + testDiag("sleepySender: sending every %.3f seconds", delay); + epicsMessageQueue q(4, 20); + epicsThreadCreate("Fast Receiver", epicsThreadPriorityMedium, + mediumStack, fastReceiver, &q); + + numSent = 0; + for (int i = 0 ; i < SLEEPY_TESTS ; i++) { + if (q.send((void *)msg1, 4) == 0) { + numSent++; + } + epicsThreadSleep(delay); + } + epicsThreadSleep(1.0); + testOk(numSent == SLEEPY_TESTS, "Sent %d (should be %d)", + numSent, SLEEPY_TESTS); + testOk(numReceived == SLEEPY_TESTS, "Received %d (should be %d)", + numReceived, SLEEPY_TESTS); + + recvExit = 1; + while (q.send((void *)msg1, 4) != 0) + epicsThreadSleep(0.01); + epicsEventMustWait(complete); +} + +extern "C" void +fastSender(void *arg) +{ + epicsMessageQueue *q = (epicsMessageQueue *)arg; + numSent = 0; + + // Send first withough timeout + q->send((void *)msg1, 4); + numSent++; + + // The rest have a timeout + while (!sendExit) { + if (q->send((void *)msg1, 4, 0.010) == 0) { + numSent++; + } + } + sendExit = 0; + epicsEventSignal(complete); +} + +void sleepyReceiver(double delay) +{ + testDiag("sleepyReceiver: acquiring every %.3f seconds", delay); + epicsMessageQueue q(4, 20); + + // Fill the queue + for (int i = q.pending(); i < 4 ;i++) { + q.send((void *)msg1, 4); + } + + epicsThreadCreate("Fast Sender", epicsThreadPriorityMedium, + mediumStack, fastSender, &q); + epicsThreadSleep(0.5); + + char cbuf[80]; + int len; + numReceived = 0; + + for (int i = 0 ; i < SLEEPY_TESTS ; i++) { + len = q.receive(cbuf, sizeof cbuf); + if (len > 0) { + numReceived++; + } + epicsThreadSleep(delay); + } + + testOk(numSent == SLEEPY_TESTS, "Sent %d (should be %d)", + numSent, SLEEPY_TESTS); + testOk(numReceived == SLEEPY_TESTS, "Received %d (should be %d)", + numReceived, SLEEPY_TESTS); + + sendExit = 1; + while (q.receive(cbuf, sizeof cbuf) <= 0) + epicsThreadSleep(0.01); + epicsEventMustWait(complete); } extern "C" void @@ -156,10 +244,8 @@ extern "C" void messageQueueTest(void *parm) int len; int pass; int want; - int numSent = 0; epicsMessageQueue *q1 = new epicsMessageQueue(4, 20); - epicsMessageQueue *q2 = new epicsMessageQueue(4, 20); testDiag("Simple single-thread tests:"); i = 0; @@ -269,34 +355,17 @@ extern "C" void messageQueueTest(void *parm) testOk(q1->send((void *)msg1, 10) == 0, "Send with no receiver"); epicsThreadSleep(2.0); - testDiag("Single receiver with timeout, single sender with sleep tests:"); - testDiag("These tests last 20 seconds ..."); - epicsThreadCreate("Fast Receiver", epicsThreadPriorityMedium, - mediumStack, fastReceiver, q2); - numSent = 0; - numReceived = 0; - for (i = 0 ; i < 1000 ; i++) { - if (q2->send((void *)msg1, 4) == 0) { - numSent++; - } - epicsThreadSleep(0.011); - } - epicsThreadSleep(1.0); - if (!testOk(numSent == 1000 && numReceived == 1000, "sleep=0.011")) { - testDiag("numSent should be 1000, actual=%d, numReceived should be 1000, actual=%d", numSent, numReceived); - } - numSent = 0; - numReceived = 0; - for (i = 0 ; i < 1000 ; i++) { - if (q2->send((void *)msg1, 4) == 0) { - numSent++; - } - epicsThreadSleep(0.010); - } - epicsThreadSleep(1.0); - if (!testOk(numSent == 1000 && numReceived == 1000, "sleep=0.010")) { - testDiag("numSent should be 1000, actual=%d, numReceived should be 1000, actual=%d", numSent, numReceived); - } + testDiag("6 Single receiver single sender 'Sleepy timeout' tests,"); + testDiag(" these should take about %.2f seconds each:", + SLEEPY_TESTS * 0.010); + + complete = epicsEventMustCreate(epicsEventEmpty); + sleepySender(0.009); + sleepySender(0.010); + sleepySender(0.011); + sleepyReceiver(0.009); + sleepyReceiver(0.010); + sleepyReceiver(0.011); testDiag("Single receiver, single sender tests:"); epicsThreadSetPriority(myThreadId, epicsThreadPriorityHigh); @@ -359,7 +428,7 @@ extern "C" void messageQueueTest(void *parm) MAIN(epicsMessageQueueTest) { - testPlan(64); + testPlan(74); finished = epicsEventMustCreate(epicsEventEmpty); mediumStack = epicsThreadGetStackSize(epicsThreadStackMedium); diff --git a/src/libCom/test/epicsStackTraceTest.c b/src/libCom/test/epicsStackTraceTest.c index 69cb499f2..246abe4d8 100644 --- a/src/libCom/test/epicsStackTraceTest.c +++ b/src/libCom/test/epicsStackTraceTest.c @@ -1,11 +1,11 @@ -/* +/* * Copyright: Stanford University / SLAC National Laboratory. * * EPICS BASE is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. + * in file LICENSE that is included with this distribution. * * Author: Till Straumann , 2014 - */ + */ /* * Check stack trace functionality @@ -135,14 +135,14 @@ findNumOcc(const char *buf) } /* We should find an address close to epicsStackTraceRecurseGbl twice */ for (i=0; i= (char*)epicsStackTraceRecurseGbl && (char*)ptrs[i] < (char*)epicsStackTraceRecurseGbl + WINDOW_SZ ) { - rval ++; + rval ++; if ( test_debug ) testDiag("found address %p again\n", ptrs[i]); } @@ -167,7 +167,7 @@ MAIN(epicsStackTraceTest) testPlan(5); - features = epicsStackTraceGetFeatures(); + features = epicsStackTraceGetFeatures(); all_features = EPICS_STACKTRACE_LCL_SYMBOLS | EPICS_STACKTRACE_GBL_SYMBOLS @@ -217,7 +217,13 @@ MAIN(epicsStackTraceTest) } if ( (features & EPICS_STACKTRACE_ADDRESSES) ) { +#ifdef _MINGW + testTodoBegin("MinGW, might fail"); +#endif testOk( numFound > 0, "dumping addresses" ); +#ifdef _MINGW + testTodoEnd(); +#endif } else { testSkip(1 , "no support for dumping addresses on this platform"); }