Merge branch 'defaultMessageQueue' into 3.15

This commit is contained in:
Andrew Johnson
2020-05-14 10:41:52 -05:00
3 changed files with 164 additions and 69 deletions

View File

@@ -106,7 +106,7 @@ epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
}
static void
freeEventNode(struct eventNode *enode)
destroyEventNode(struct eventNode *enode)
{
epicsEventDestroy(enode->event);
free(enode);
@@ -119,7 +119,7 @@ epicsMessageQueueDestroy(epicsMessageQueueId pmsg)
while ((evp = reinterpret_cast < struct eventNode * >
( ellGet(&pmsg->eventFreeList) ) ) != NULL) {
freeEventNode(evp);
destroyEventNode(evp);
}
epicsMutexDestroy(pmsg->mutex);
free(pmsg->buf);
@@ -145,6 +145,16 @@ getEventNode(epicsMessageQueueId pmsg)
return evp;
}
static void
freeEventNode(epicsMessageQueueId pmsg, eventNode *evp, epicsEventStatus status)
{
if (status == epicsEventWaitTimeout) {
epicsEventSignal(evp->event);
epicsEventWait(evp->event);
}
ellAdd(&pmsg->eventFreeList, &evp->link);
}
static int
mySend(epicsMessageQueueId pmsg, void *message, unsigned int size,
double timeout)
@@ -163,7 +173,7 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size,
if ((pmsg->numberOfSendersWaiting > 0)
|| (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL))) {
/*
* Return if not allowed to wait
* Return if not allowed to wait. NB -1 means wait forever.
*/
if (timeout == 0) {
epicsMutexUnlock(pmsg->mutex);
@@ -171,7 +181,7 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size,
}
/*
* Wait
* Indicate that we're waiting
*/
struct threadNode threadNode;
threadNode.evp = getEventNode(pmsg);
@@ -186,22 +196,25 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size,
epicsMutexUnlock(pmsg->mutex);
epicsEventStatus status;
if (timeout > 0)
status = epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
else
status = epicsEventWait(threadNode.evp->event);
/*
* Wait for receiver to wake us
*/
epicsEventStatus status = timeout < 0 ?
epicsEventWait(threadNode.evp->event) :
epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
epicsMutexMustLock(pmsg->mutex);
if(!threadNode.eventSent)
if (!threadNode.eventSent) {
/* Receiver didn't take us off the sendQueue, do it ourselves */
ellDelete(&pmsg->sendQueue, &threadNode.link);
pmsg->numberOfSendersWaiting--;
pmsg->numberOfSendersWaiting--;
}
ellAdd(&pmsg->eventFreeList, &threadNode.evp->link);
freeEventNode(pmsg, threadNode.evp, status);
if ((pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) ||
status != epicsEventOK) {
if (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) {
/* State of the queue didn't change, exit */
epicsMutexUnlock(pmsg->mutex);
return -1;
}
@@ -294,6 +307,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size,
*/
if ((pthr = reinterpret_cast < struct threadNode * >
( ellGet(&pmsg->sendQueue) ) ) != NULL) {
pmsg->numberOfSendersWaiting--;
pthr->eventSent = true;
epicsEventSignal(pthr->evp->event);
}
@@ -302,7 +316,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size,
}
/*
* Return if not allowed to wait
* Return if not allowed to wait. NB -1 means wait forever.
*/
if (timeout == 0) {
epicsMutexUnlock(pmsg->mutex);
@@ -310,16 +324,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size,
}
/*
* Wake up the oldest task waiting to send
*/
if ((pthr = reinterpret_cast < struct threadNode * >
( ellGet(&pmsg->sendQueue) ) ) != NULL) {
pthr->eventSent = true;
epicsEventSignal(pthr->evp->event);
}
/*
* Wait for message to arrive
* Indicate that we're waiting
*/
struct threadNode threadNode;
threadNode.evp = getEventNode(pmsg);
@@ -333,18 +338,32 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size,
}
ellAdd(&pmsg->receiveQueue, &threadNode.link);
/*
* Wake up the oldest task waiting to send
*/
if ((pthr = reinterpret_cast < struct threadNode * >
( ellGet(&pmsg->sendQueue) ) ) != NULL) {
pmsg->numberOfSendersWaiting--;
pthr->eventSent = true;
epicsEventSignal(pthr->evp->event);
}
epicsMutexUnlock(pmsg->mutex);
if (timeout > 0)
/*
* Wait for a message to arrive
*/
epicsEventStatus status = timeout < 0 ?
epicsEventWait(threadNode.evp->event) :
epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
else
epicsEventWait(threadNode.evp->event);
epicsMutexMustLock(pmsg->mutex);
if (!threadNode.eventSent)
ellDelete(&pmsg->receiveQueue, &threadNode.link);
ellAdd(&pmsg->eventFreeList, &threadNode.evp->link);
freeEventNode(pmsg, threadNode.evp, status);
epicsMutexUnlock(pmsg->mutex);
@@ -396,7 +415,8 @@ epicsMessageQueuePending(epicsMessageQueueId pmsg)
epicsShareFunc void epicsShareAPI
epicsMessageQueueShow(epicsMessageQueueId pmsg, int level)
{
printf("Message Queue Used:%d Slots:%lu", epicsMessageQueuePending(pmsg), pmsg->capacity);
printf("Message Queue Used:%d Slots:%lu",
epicsMessageQueuePending(pmsg), pmsg->capacity);
if (level >= 1)
printf(" Maximum size:%lu", pmsg->maxMessageSize);
printf("\n");

View File

@@ -27,7 +27,10 @@ static volatile int sendExit = 0;
static volatile int recvExit = 0;
static epicsEventId finished;
static unsigned int mediumStack;
static int numReceived;
#define SLEEPY_TESTS 500
static int numSent, numReceived;
static epicsEventId complete;
/*
* In Numerical Recipes in C: The Art of Scientific Computing (William H.
@@ -124,11 +127,96 @@ fastReceiver(void *arg)
int len;
numReceived = 0;
while (!recvExit) {
len = q->receive(cbuf, sizeof cbuf, 0.01);
len = q->receive(cbuf, sizeof cbuf, 0.010);
if (len > 0) {
numReceived++;
}
}
recvExit = 0;
epicsEventSignal(complete);
}
void sleepySender(double delay)
{
testDiag("sleepySender: sending every %.3f seconds", delay);
epicsMessageQueue q(4, 20);
epicsThreadCreate("Fast Receiver", epicsThreadPriorityMedium,
mediumStack, fastReceiver, &q);
numSent = 0;
for (int i = 0 ; i < SLEEPY_TESTS ; i++) {
if (q.send((void *)msg1, 4) == 0) {
numSent++;
}
epicsThreadSleep(delay);
}
epicsThreadSleep(1.0);
testOk(numSent == SLEEPY_TESTS, "Sent %d (should be %d)",
numSent, SLEEPY_TESTS);
testOk(numReceived == SLEEPY_TESTS, "Received %d (should be %d)",
numReceived, SLEEPY_TESTS);
recvExit = 1;
while (q.send((void *)msg1, 4) != 0)
epicsThreadSleep(0.01);
epicsEventMustWait(complete);
}
extern "C" void
fastSender(void *arg)
{
epicsMessageQueue *q = (epicsMessageQueue *)arg;
numSent = 0;
// Send first withough timeout
q->send((void *)msg1, 4);
numSent++;
// The rest have a timeout
while (!sendExit) {
if (q->send((void *)msg1, 4, 0.010) == 0) {
numSent++;
}
}
sendExit = 0;
epicsEventSignal(complete);
}
void sleepyReceiver(double delay)
{
testDiag("sleepyReceiver: acquiring every %.3f seconds", delay);
epicsMessageQueue q(4, 20);
// Fill the queue
for (int i = q.pending(); i < 4 ;i++) {
q.send((void *)msg1, 4);
}
epicsThreadCreate("Fast Sender", epicsThreadPriorityMedium,
mediumStack, fastSender, &q);
epicsThreadSleep(0.5);
char cbuf[80];
int len;
numReceived = 0;
for (int i = 0 ; i < SLEEPY_TESTS ; i++) {
len = q.receive(cbuf, sizeof cbuf);
if (len > 0) {
numReceived++;
}
epicsThreadSleep(delay);
}
testOk(numSent == SLEEPY_TESTS, "Sent %d (should be %d)",
numSent, SLEEPY_TESTS);
testOk(numReceived == SLEEPY_TESTS, "Received %d (should be %d)",
numReceived, SLEEPY_TESTS);
sendExit = 1;
while (q.receive(cbuf, sizeof cbuf) <= 0)
epicsThreadSleep(0.01);
epicsEventMustWait(complete);
}
extern "C" void
@@ -156,10 +244,8 @@ extern "C" void messageQueueTest(void *parm)
int len;
int pass;
int want;
int numSent = 0;
epicsMessageQueue *q1 = new epicsMessageQueue(4, 20);
epicsMessageQueue *q2 = new epicsMessageQueue(4, 20);
testDiag("Simple single-thread tests:");
i = 0;
@@ -269,34 +355,17 @@ extern "C" void messageQueueTest(void *parm)
testOk(q1->send((void *)msg1, 10) == 0, "Send with no receiver");
epicsThreadSleep(2.0);
testDiag("Single receiver with timeout, single sender with sleep tests:");
testDiag("These tests last 20 seconds ...");
epicsThreadCreate("Fast Receiver", epicsThreadPriorityMedium,
mediumStack, fastReceiver, q2);
numSent = 0;
numReceived = 0;
for (i = 0 ; i < 1000 ; i++) {
if (q2->send((void *)msg1, 4) == 0) {
numSent++;
}
epicsThreadSleep(0.011);
}
epicsThreadSleep(1.0);
if (!testOk(numSent == 1000 && numReceived == 1000, "sleep=0.011")) {
testDiag("numSent should be 1000, actual=%d, numReceived should be 1000, actual=%d", numSent, numReceived);
}
numSent = 0;
numReceived = 0;
for (i = 0 ; i < 1000 ; i++) {
if (q2->send((void *)msg1, 4) == 0) {
numSent++;
}
epicsThreadSleep(0.010);
}
epicsThreadSleep(1.0);
if (!testOk(numSent == 1000 && numReceived == 1000, "sleep=0.010")) {
testDiag("numSent should be 1000, actual=%d, numReceived should be 1000, actual=%d", numSent, numReceived);
}
testDiag("6 Single receiver single sender 'Sleepy timeout' tests,");
testDiag(" these should take about %.2f seconds each:",
SLEEPY_TESTS * 0.010);
complete = epicsEventMustCreate(epicsEventEmpty);
sleepySender(0.009);
sleepySender(0.010);
sleepySender(0.011);
sleepyReceiver(0.009);
sleepyReceiver(0.010);
sleepyReceiver(0.011);
testDiag("Single receiver, single sender tests:");
epicsThreadSetPriority(myThreadId, epicsThreadPriorityHigh);
@@ -359,7 +428,7 @@ extern "C" void messageQueueTest(void *parm)
MAIN(epicsMessageQueueTest)
{
testPlan(64);
testPlan(74);
finished = epicsEventMustCreate(epicsEventEmpty);
mediumStack = epicsThreadGetStackSize(epicsThreadStackMedium);

View File

@@ -1,11 +1,11 @@
/*
/*
* Copyright: Stanford University / SLAC National Laboratory.
*
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
* in file LICENSE that is included with this distribution.
*
* Author: Till Straumann <strauman@slac.stanford.edu>, 2014
*/
*/
/*
* Check stack trace functionality
@@ -135,14 +135,14 @@ findNumOcc(const char *buf)
}
/* We should find an address close to epicsStackTraceRecurseGbl twice */
for (i=0; i<n_ptrs-1; i++) {
/* I got a (unjustified) index-out-of-bound warning
/* I got a (unjustified) index-out-of-bound warning
* when setting j=i+1 here. Thus the weird j!= i check...
*/
j = i;
while ( j < n_ptrs ) {
if ( j != i && ptrs[j] == ptrs[i] ) {
if ( (char*)ptrs[i] >= (char*)epicsStackTraceRecurseGbl && (char*)ptrs[i] < (char*)epicsStackTraceRecurseGbl + WINDOW_SZ ) {
rval ++;
rval ++;
if ( test_debug )
testDiag("found address %p again\n", ptrs[i]);
}
@@ -167,7 +167,7 @@ MAIN(epicsStackTraceTest)
testPlan(5);
features = epicsStackTraceGetFeatures();
features = epicsStackTraceGetFeatures();
all_features = EPICS_STACKTRACE_LCL_SYMBOLS
| EPICS_STACKTRACE_GBL_SYMBOLS
@@ -217,7 +217,13 @@ MAIN(epicsStackTraceTest)
}
if ( (features & EPICS_STACKTRACE_ADDRESSES) ) {
#ifdef _MINGW
testTodoBegin("MinGW, might fail");
#endif
testOk( numFound > 0, "dumping addresses" );
#ifdef _MINGW
testTodoEnd();
#endif
} else {
testSkip(1 , "no support for dumping addresses on this platform");
}