Simplify epicsMessageQueueTest using joinable threads

Fixes issues with thread shutdown
This commit is contained in:
Andrew Johnson
2020-07-02 16:17:04 -05:00
parent 1eeac6da2f
commit 5c03f8ba79

View File

@@ -25,12 +25,9 @@
static const char *msg1 = "1234567890This is a very long message.";
static volatile int sendExit = 0;
static volatile int recvExit = 0;
static epicsEventId finished;
static unsigned int mediumStack;
#define SLEEPY_TESTS 500
static int numSent, numReceived;
static epicsEventId complete;
/*
* In Numerical Recipes in C: The Art of Scientific Computing (William H.
@@ -116,7 +113,6 @@ receiver(void *arg)
if (!testOk1(errors == 0))
testDiag("Error count was %d", errors);
testDiag("%s exiting", myName);
epicsEventSignal(finished);
}
extern "C" void
@@ -133,15 +129,18 @@ fastReceiver(void *arg)
}
}
recvExit = 0;
epicsEventSignal(complete);
}
void sleepySender(double delay)
{
epicsThreadOpts opts = {epicsThreadPriorityMedium, epicsThreadStackMedium, 1};
epicsThreadId rxThread;
testDiag("sleepySender: sending every %.3f seconds", delay);
epicsMessageQueue q(4, 20);
epicsThreadCreate("Fast Receiver", epicsThreadPriorityMedium,
mediumStack, fastReceiver, &q);
rxThread = epicsThreadCreateOpt("Fast Receiver", fastReceiver, &q, &opts);
if (!rxThread)
testAbort("Task create failed");
numSent = 0;
for (int i = 0 ; i < SLEEPY_TESTS ; i++) {
@@ -159,7 +158,7 @@ void sleepySender(double delay)
recvExit = 1;
while (q.send((void *)msg1, 4) != 0)
epicsThreadSleep(0.01);
epicsEventMustWait(complete);
epicsThreadMustJoin(rxThread);
}
extern "C" void
@@ -179,11 +178,14 @@ fastSender(void *arg)
}
}
sendExit = 0;
epicsEventSignal(complete);
}
void sleepyReceiver(double delay)
{
epicsThreadOpts opts = {epicsThreadPriorityMedium,
epicsThreadStackMedium, 1};
epicsThreadId txThread;
testDiag("sleepyReceiver: acquiring every %.3f seconds", delay);
epicsMessageQueue q(4, 20);
@@ -192,8 +194,9 @@ void sleepyReceiver(double delay)
q.send((void *)msg1, 4);
}
epicsThreadCreate("Fast Sender", epicsThreadPriorityMedium,
mediumStack, fastSender, &q);
txThread = epicsThreadCreateOpt("Fast Sender", fastSender, &q, &opts);
if (!txThread)
testAbort("Task create failed");
epicsThreadSleep(0.5);
char cbuf[80];
@@ -216,7 +219,7 @@ void sleepyReceiver(double delay)
sendExit = 1;
while (q.receive(cbuf, sizeof cbuf) <= 0)
epicsThreadSleep(0.01);
epicsEventMustWait(complete);
epicsThreadMustJoin(txThread);
}
extern "C" void
@@ -236,130 +239,136 @@ sender(void *arg)
testDiag("%s exiting, sent %d messages", epicsThreadGetNameSelf(), i);
}
#define NUM_SENDERS 4
extern "C" void messageQueueTest(void *parm)
{
epicsThreadId myThreadId = epicsThreadGetIdSelf();
epicsThreadId rxThread;
epicsThreadId senderId[NUM_SENDERS];
epicsThreadOpts opts = {epicsThreadPriorityMedium,
epicsThreadStackMedium, 1};
unsigned int i;
char cbuf[80];
int len;
int pass;
int want;
epicsMessageQueue *q1 = new epicsMessageQueue(4, 20);
epicsMessageQueue q1(4, 20);
testDiag("Simple single-thread tests:");
i = 0;
testOk1(q1->pending() == 0);
while (q1->trySend((void *)msg1, i ) == 0) {
i++;
testOk(q1->pending() == i, "q1->pending() == %d", i);
testOk1(q1.pending() == 0);
for (i = 0; i < 4;) {
int ret = q1.trySend((void *)msg1, i++);
testOk(ret == 0, "trySend succeeded (%d == 0)", ret);
testOk(q1.pending() == i, "loop: q1.pending() == %d", i);
}
testOk1(q1->pending() == 4);
testOk1(q1.pending() == 4);
want = 0;
len = q1->receive(cbuf, sizeof cbuf);
testOk1(q1->pending() == 3);
len = q1.receive(cbuf, sizeof cbuf);
testOk1(q1.pending() == 3);
if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
want++;
len = q1->receive(cbuf, sizeof cbuf);
testOk1(q1->pending() == 2);
len = q1.receive(cbuf, sizeof cbuf);
testOk1(q1.pending() == 2);
if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
q1->trySend((void *)msg1, i++);
q1.trySend((void *)msg1, i++);
want++;
len = q1->receive(cbuf, sizeof cbuf);
testOk1(q1->pending() == 2);
len = q1.receive(cbuf, sizeof cbuf);
testOk1(q1.pending() == 2);
if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
q1->trySend((void *)msg1, i++);
testOk1(q1->pending() == 3);
q1.trySend((void *)msg1, i++);
testOk1(q1.pending() == 3);
i = 3;
while ((len = q1->receive(cbuf, sizeof cbuf, 1.0)) >= 0) {
while ((len = q1.receive(cbuf, sizeof cbuf, 1.0)) >= 0) {
--i;
testOk(q1->pending() == i, "q1->pending() == %d", i);
testOk(q1.pending() == i, "loop: q1.pending() == %d", i);
want++;
if (!testOk1((len == want) & (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
}
testOk1(q1->pending() == 0);
testOk1(q1.pending() == 0);
testDiag("Test sender timeout:");
i = 0;
testOk1(q1->pending() == 0);
while (q1->send((void *)msg1, i, 1.0 ) == 0) {
testOk1(q1.pending() == 0);
while (q1.send((void *)msg1, i, 1.0 ) == 0) {
i++;
testOk(q1->pending() == i, "q1->pending() == %d", i);
testOk(q1.pending() == i, "loop: q1.pending() == %d", i);
}
testOk1(q1->pending() == 4);
testOk1(q1.pending() == 4);
want = 0;
len = q1->receive(cbuf, sizeof cbuf);
testOk1(q1->pending() == 3);
len = q1.receive(cbuf, sizeof cbuf);
testOk1(q1.pending() == 3);
if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
want++;
len = q1->receive(cbuf, sizeof cbuf);
testOk1(q1->pending() == 2);
len = q1.receive(cbuf, sizeof cbuf);
testOk1(q1.pending() == 2);
if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
q1->send((void *)msg1, i++, 1.0);
q1.send((void *)msg1, i++, 1.0);
want++;
len = q1->receive(cbuf, sizeof cbuf);
testOk1(q1->pending() == 2);
len = q1.receive(cbuf, sizeof cbuf);
testOk1(q1.pending() == 2);
if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
q1->send((void *)msg1, i++, 1.0);
testOk1(q1->pending() == 3);
q1.send((void *)msg1, i++, 1.0);
testOk1(q1.pending() == 3);
i = 3;
while ((len = q1->receive(cbuf, sizeof cbuf, 1.0)) >= 0) {
while ((len = q1.receive(cbuf, sizeof cbuf, 1.0)) >= 0) {
--i;
testOk(q1->pending() == i, "q1->pending() == %d", i);
testOk(q1.pending() == i, "loop: q1.pending() == %d", i);
want++;
if (!testOk1((len == want) && (strncmp(msg1, cbuf, len) == 0)))
testDiag("wanted:%d '%.*s' got:%d '%.*s'",
want, want, msg1, len, len, cbuf);
}
testOk1(q1->pending() == 0);
testOk1(q1.pending() == 0);
testDiag("Test receiver with timeout:");
for (i = 0 ; i < 4 ; i++)
testOk1 (q1->send((void *)msg1, i, 1.0) == 0);
testOk1(q1->pending() == 4);
testOk1 (q1.send((void *)msg1, i, 1.0) == 0);
testOk1(q1.pending() == 4);
for (i = 0 ; i < 4 ; i++)
testOk(q1->receive((void *)cbuf, sizeof cbuf, 1.0) == (int)i,
"q1->receive(...) == %d", i);
testOk1(q1->pending() == 0);
testOk1(q1->receive((void *)cbuf, sizeof cbuf, 1.0) < 0);
testOk1(q1->pending() == 0);
testOk(q1.receive((void *)cbuf, sizeof cbuf, 1.0) == (int)i,
"q1.receive(...) == %d", i);
testOk1(q1.pending() == 0);
testOk1(q1.receive((void *)cbuf, sizeof cbuf, 1.0) < 0);
testOk1(q1.pending() == 0);
testDiag("Single receiver with invalid size, single sender tests:");
epicsThreadCreate("Bad Receiver", epicsThreadPriorityMedium,
mediumStack, badReceiver, q1);
rxThread = epicsThreadCreateOpt("Bad Receiver", badReceiver, &q1, &opts);
if (!rxThread)
testAbort("epicsThreadCreate failed");
epicsThreadSleep(1.0);
testOk(q1->send((void *)msg1, 10) == 0, "Send with waiting receiver");
epicsThreadSleep(2.0);
testOk(q1->send((void *)msg1, 10) == 0, "Send with no receiver");
testOk(q1.send((void *)msg1, 10) == 0, "Send with waiting receiver");
epicsThreadSleep(2.0);
testOk(q1.send((void *)msg1, 10) == 0, "Send with no receiver");
epicsThreadMustJoin(rxThread);
testDiag("6 Single receiver single sender 'Sleepy timeout' tests,");
testDiag(" these should take about %.2f seconds each:",
SLEEPY_TESTS * 0.010);
complete = epicsEventMustCreate(epicsEventEmpty);
sleepySender(0.009);
sleepySender(0.010);
sleepySender(0.011);
@@ -369,11 +378,12 @@ extern "C" void messageQueueTest(void *parm)
testDiag("Single receiver, single sender tests:");
epicsThreadSetPriority(myThreadId, epicsThreadPriorityHigh);
epicsThreadCreate("Receiver one", epicsThreadPriorityMedium,
mediumStack, receiver, q1);
rxThread = epicsThreadCreateOpt("Receiver one", receiver, &q1, &opts);
if (!rxThread)
testAbort("epicsThreadCreate failed");
for (pass = 1 ; pass <= 3 ; pass++) {
for (i = 0 ; i < 10 ; i++) {
if (q1->trySend((void *)msg1, i) < 0)
if (q1.trySend((void *)msg1, i) < 0)
break;
if (pass >= 3)
epicsThreadSleep(0.5);
@@ -402,18 +412,20 @@ extern "C" void messageQueueTest(void *parm)
*/
testDiag("Single receiver, multiple sender tests:");
testDiag("This test lasts 30 seconds...");
testOk(!!epicsThreadCreate("Sender 1", epicsThreadPriorityLow,
mediumStack, sender, q1),
"Created Sender 1");
testOk(!!epicsThreadCreate("Sender 2", epicsThreadPriorityMedium,
mediumStack, sender, q1),
"Created Sender 2");
testOk(!!epicsThreadCreate("Sender 3", epicsThreadPriorityHigh,
mediumStack, sender, q1),
"Created Sender 3");
testOk(!!epicsThreadCreate("Sender 4", epicsThreadPriorityHigh,
mediumStack, sender, q1),
"Created Sender 4");
for (i=0; i<NUM_SENDERS; i++) {
char name[16];
const int pri[NUM_SENDERS] = {
epicsThreadPriorityLow,
epicsThreadPriorityMedium,
epicsThreadPriorityHigh,
epicsThreadPriorityHigh
};
sprintf(name, "Sender %d", i+1);
opts.priority = pri[i];
senderId[i] = epicsThreadCreateOpt(name, sender, &q1, &opts);
if (!senderId[i])
testAbort("epicsThreadCreate failed");
}
for (i = 0; i < 6; i++) {
testDiag("... %2d", 6 - i);
@@ -422,23 +434,30 @@ extern "C" void messageQueueTest(void *parm)
sendExit = 1;
epicsThreadSleep(1.0);
for (i=0; i<NUM_SENDERS; i++) {
epicsThreadMustJoin(senderId[i]);
}
recvExit = 1;
testDiag("Scheduler exiting");
epicsThreadMustJoin(rxThread);
}
MAIN(epicsMessageQueueTest)
{
testPlan(74);
epicsThreadOpts opts = {
epicsThreadPriorityMedium,
epicsThreadStackMedium,
1
};
epicsThreadId testThread;
finished = epicsEventMustCreate(epicsEventEmpty);
mediumStack = epicsThreadGetStackSize(epicsThreadStackMedium);
testPlan(70 + NUM_SENDERS);
epicsThreadCreate("messageQueueTest", epicsThreadPriorityMedium,
mediumStack, messageQueueTest, NULL);
testThread = epicsThreadCreateOpt("messageQueueTest",
messageQueueTest, NULL, &opts);
if (!testThread)
testAbort("epicsThreadCreate failed");
epicsEventMustWait(finished);
testDiag("Main thread signalled");
epicsThreadSleep(1.0);
epicsThreadMustJoin(testThread);
return testDone();
}