From 5c03f8ba799c0cccc5067e875bec76f31fa1339b Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Thu, 2 Jul 2020 16:17:04 -0500 Subject: [PATCH] Simplify epicsMessageQueueTest using joinable threads Fixes issues with thread shutdown --- modules/libcom/test/epicsMessageQueueTest.cpp | 187 ++++++++++-------- 1 file changed, 103 insertions(+), 84 deletions(-) diff --git a/modules/libcom/test/epicsMessageQueueTest.cpp b/modules/libcom/test/epicsMessageQueueTest.cpp index f23683c84..340ae6b58 100644 --- a/modules/libcom/test/epicsMessageQueueTest.cpp +++ b/modules/libcom/test/epicsMessageQueueTest.cpp @@ -25,12 +25,9 @@ static const char *msg1 = "1234567890This is a very long message."; static volatile int sendExit = 0; static volatile int recvExit = 0; -static epicsEventId finished; -static unsigned int mediumStack; #define SLEEPY_TESTS 500 static int numSent, numReceived; -static epicsEventId complete; /* * In Numerical Recipes in C: The Art of Scientific Computing (William H. @@ -116,7 +113,6 @@ receiver(void *arg) if (!testOk1(errors == 0)) testDiag("Error count was %d", errors); testDiag("%s exiting", myName); - epicsEventSignal(finished); } extern "C" void @@ -133,15 +129,18 @@ fastReceiver(void *arg) } } recvExit = 0; - epicsEventSignal(complete); } void sleepySender(double delay) { + epicsThreadOpts opts = {epicsThreadPriorityMedium, epicsThreadStackMedium, 1}; + epicsThreadId rxThread; + testDiag("sleepySender: sending every %.3f seconds", delay); epicsMessageQueue q(4, 20); - epicsThreadCreate("Fast Receiver", epicsThreadPriorityMedium, - mediumStack, fastReceiver, &q); + rxThread = epicsThreadCreateOpt("Fast Receiver", fastReceiver, &q, &opts); + if (!rxThread) + testAbort("Task create failed"); numSent = 0; for (int i = 0 ; i < SLEEPY_TESTS ; i++) { @@ -159,7 +158,7 @@ void sleepySender(double delay) recvExit = 1; while (q.send((void *)msg1, 4) != 0) epicsThreadSleep(0.01); - epicsEventMustWait(complete); + epicsThreadMustJoin(rxThread); } extern "C" void @@ -179,11 +178,14 @@ fastSender(void *arg) } } sendExit = 0; - epicsEventSignal(complete); } void sleepyReceiver(double delay) { + epicsThreadOpts opts = {epicsThreadPriorityMedium, + epicsThreadStackMedium, 1}; + epicsThreadId txThread; + testDiag("sleepyReceiver: acquiring every %.3f seconds", delay); epicsMessageQueue q(4, 20); @@ -192,8 +194,9 @@ void sleepyReceiver(double delay) q.send((void *)msg1, 4); } - epicsThreadCreate("Fast Sender", epicsThreadPriorityMedium, - mediumStack, fastSender, &q); + txThread = epicsThreadCreateOpt("Fast Sender", fastSender, &q, &opts); + if (!txThread) + testAbort("Task create failed"); epicsThreadSleep(0.5); char cbuf[80]; @@ -216,7 +219,7 @@ void sleepyReceiver(double delay) sendExit = 1; while (q.receive(cbuf, sizeof cbuf) <= 0) epicsThreadSleep(0.01); - epicsEventMustWait(complete); + epicsThreadMustJoin(txThread); } extern "C" void @@ -236,130 +239,136 @@ sender(void *arg) testDiag("%s exiting, sent %d messages", epicsThreadGetNameSelf(), i); } +#define NUM_SENDERS 4 extern "C" void messageQueueTest(void *parm) { epicsThreadId myThreadId = epicsThreadGetIdSelf(); + epicsThreadId rxThread; + epicsThreadId senderId[NUM_SENDERS]; + epicsThreadOpts opts = {epicsThreadPriorityMedium, + epicsThreadStackMedium, 1}; + unsigned int i; char cbuf[80]; int len; int pass; int want; - epicsMessageQueue *q1 = new epicsMessageQueue(4, 20); + epicsMessageQueue q1(4, 20); testDiag("Simple single-thread tests:"); - i = 0; - testOk1(q1->pending() == 0); - while (q1->trySend((void *)msg1, i ) == 0) { - i++; - testOk(q1->pending() == i, "q1->pending() == %d", i); + testOk1(q1.pending() == 0); + for (i = 0; i < 4;) { + int ret = q1.trySend((void *)msg1, i++); + testOk(ret == 0, "trySend succeeded (%d == 0)", ret); + testOk(q1.pending() == i, "loop: q1.pending() == %d", i); } - testOk1(q1->pending() == 4); + testOk1(q1.pending() == 4); want = 0; - len = q1->receive(cbuf, sizeof cbuf); - testOk1(q1->pending() == 3); + len = q1.receive(cbuf, sizeof cbuf); + testOk1(q1.pending() == 3); if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); want++; - len = q1->receive(cbuf, sizeof cbuf); - testOk1(q1->pending() == 2); + len = q1.receive(cbuf, sizeof cbuf); + testOk1(q1.pending() == 2); if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); - q1->trySend((void *)msg1, i++); + q1.trySend((void *)msg1, i++); want++; - len = q1->receive(cbuf, sizeof cbuf); - testOk1(q1->pending() == 2); + len = q1.receive(cbuf, sizeof cbuf); + testOk1(q1.pending() == 2); if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); - q1->trySend((void *)msg1, i++); - testOk1(q1->pending() == 3); + q1.trySend((void *)msg1, i++); + testOk1(q1.pending() == 3); i = 3; - while ((len = q1->receive(cbuf, sizeof cbuf, 1.0)) >= 0) { + while ((len = q1.receive(cbuf, sizeof cbuf, 1.0)) >= 0) { --i; - testOk(q1->pending() == i, "q1->pending() == %d", i); + testOk(q1.pending() == i, "loop: q1.pending() == %d", i); want++; if (!testOk1((len == want) & (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); } - testOk1(q1->pending() == 0); + testOk1(q1.pending() == 0); testDiag("Test sender timeout:"); i = 0; - testOk1(q1->pending() == 0); - while (q1->send((void *)msg1, i, 1.0 ) == 0) { + testOk1(q1.pending() == 0); + while (q1.send((void *)msg1, i, 1.0 ) == 0) { i++; - testOk(q1->pending() == i, "q1->pending() == %d", i); + testOk(q1.pending() == i, "loop: q1.pending() == %d", i); } - testOk1(q1->pending() == 4); + testOk1(q1.pending() == 4); want = 0; - len = q1->receive(cbuf, sizeof cbuf); - testOk1(q1->pending() == 3); + len = q1.receive(cbuf, sizeof cbuf); + testOk1(q1.pending() == 3); if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); want++; - len = q1->receive(cbuf, sizeof cbuf); - testOk1(q1->pending() == 2); + len = q1.receive(cbuf, sizeof cbuf); + testOk1(q1.pending() == 2); if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); - q1->send((void *)msg1, i++, 1.0); + q1.send((void *)msg1, i++, 1.0); want++; - len = q1->receive(cbuf, sizeof cbuf); - testOk1(q1->pending() == 2); + len = q1.receive(cbuf, sizeof cbuf); + testOk1(q1.pending() == 2); if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); - q1->send((void *)msg1, i++, 1.0); - testOk1(q1->pending() == 3); + q1.send((void *)msg1, i++, 1.0); + testOk1(q1.pending() == 3); i = 3; - while ((len = q1->receive(cbuf, sizeof cbuf, 1.0)) >= 0) { + while ((len = q1.receive(cbuf, sizeof cbuf, 1.0)) >= 0) { --i; - testOk(q1->pending() == i, "q1->pending() == %d", i); + testOk(q1.pending() == i, "loop: q1.pending() == %d", i); want++; if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0))) testDiag("wanted:%d '%.*s' got:%d '%.*s'", want, want, msg1, len, len, cbuf); } - testOk1(q1->pending() == 0); + testOk1(q1.pending() == 0); testDiag("Test receiver with timeout:"); for (i = 0 ; i < 4 ; i++) - testOk1 (q1->send((void *)msg1, i, 1.0) == 0); - testOk1(q1->pending() == 4); + testOk1 (q1.send((void *)msg1, i, 1.0) == 0); + testOk1(q1.pending() == 4); for (i = 0 ; i < 4 ; i++) - testOk(q1->receive((void *)cbuf, sizeof cbuf, 1.0) == (int)i, - "q1->receive(...) == %d", i); - testOk1(q1->pending() == 0); - testOk1(q1->receive((void *)cbuf, sizeof cbuf, 1.0) < 0); - testOk1(q1->pending() == 0); + testOk(q1.receive((void *)cbuf, sizeof cbuf, 1.0) == (int)i, + "q1.receive(...) == %d", i); + testOk1(q1.pending() == 0); + testOk1(q1.receive((void *)cbuf, sizeof cbuf, 1.0) < 0); + testOk1(q1.pending() == 0); testDiag("Single receiver with invalid size, single sender tests:"); - epicsThreadCreate("Bad Receiver", epicsThreadPriorityMedium, - mediumStack, badReceiver, q1); + rxThread = epicsThreadCreateOpt("Bad Receiver", badReceiver, &q1, &opts); + if (!rxThread) + testAbort("epicsThreadCreate failed"); epicsThreadSleep(1.0); - testOk(q1->send((void *)msg1, 10) == 0, "Send with waiting receiver"); - epicsThreadSleep(2.0); - testOk(q1->send((void *)msg1, 10) == 0, "Send with no receiver"); + testOk(q1.send((void *)msg1, 10) == 0, "Send with waiting receiver"); epicsThreadSleep(2.0); + testOk(q1.send((void *)msg1, 10) == 0, "Send with no receiver"); + epicsThreadMustJoin(rxThread); testDiag("6 Single receiver single sender 'Sleepy timeout' tests,"); testDiag(" these should take about %.2f seconds each:", SLEEPY_TESTS * 0.010); - complete = epicsEventMustCreate(epicsEventEmpty); sleepySender(0.009); sleepySender(0.010); sleepySender(0.011); @@ -369,11 +378,12 @@ extern "C" void messageQueueTest(void *parm) testDiag("Single receiver, single sender tests:"); epicsThreadSetPriority(myThreadId, epicsThreadPriorityHigh); - epicsThreadCreate("Receiver one", epicsThreadPriorityMedium, - mediumStack, receiver, q1); + rxThread = epicsThreadCreateOpt("Receiver one", receiver, &q1, &opts); + if (!rxThread) + testAbort("epicsThreadCreate failed"); for (pass = 1 ; pass <= 3 ; pass++) { for (i = 0 ; i < 10 ; i++) { - if (q1->trySend((void *)msg1, i) < 0) + if (q1.trySend((void *)msg1, i) < 0) break; if (pass >= 3) epicsThreadSleep(0.5); @@ -402,18 +412,20 @@ extern "C" void messageQueueTest(void *parm) */ testDiag("Single receiver, multiple sender tests:"); testDiag("This test lasts 30 seconds..."); - testOk(!!epicsThreadCreate("Sender 1", epicsThreadPriorityLow, - mediumStack, sender, q1), - "Created Sender 1"); - testOk(!!epicsThreadCreate("Sender 2", epicsThreadPriorityMedium, - mediumStack, sender, q1), - "Created Sender 2"); - testOk(!!epicsThreadCreate("Sender 3", epicsThreadPriorityHigh, - mediumStack, sender, q1), - "Created Sender 3"); - testOk(!!epicsThreadCreate("Sender 4", epicsThreadPriorityHigh, - mediumStack, sender, q1), - "Created Sender 4"); + for (i=0; i