Create os-dependent message queue routines.

This commit is contained in:
W. Eric Norum
2003-02-27 22:59:07 +00:00
parent 22b33711e6
commit 17624a28c3
7 changed files with 519 additions and 257 deletions
+4 -2
View File
@@ -28,10 +28,8 @@ SRC_DIRS += $(LIBCOM)/ring
#following needed for locating epicsRingPointer.h and epicsRingBytes.h
INC += epicsRingPointer.h
INC += epicsRingBytes.h
INC += epicsMessageQueue.h
SRCS += epicsRingPointer.cpp
SRCS += epicsRingBytes.c
SRCS += epicsMessageQueue.cpp
SRC_DIRS += $(LIBCOM)/calc
#following needed for locating postfixPvt.h and sCalcPostfixPvt.h
@@ -151,6 +149,7 @@ INC += osdMutex.h
INC += epicsEvent.h
INC += osdEvent.h
INC += epicsMath.h
INC += osdMessageQueue.h
INC += epicsAssert.h
INC += epicsFindSymbol.h
@@ -166,11 +165,13 @@ INC += osiProcess.h
INC += osiUnistd.h
INC += osiWireFormat.h
INC += epicsReadline.h
INC += epicsMessageQueue.h
SRCS += epicsThread.cpp
SRCS += epicsMutex.cpp
SRCS += epicsEvent.cpp
SRCS += epicsTime.cpp
SRCS += epicsMessageQueue.cpp
SRCS += osdSock.c
SRCS += osiSock.c
@@ -190,6 +191,7 @@ SRCS += osdEvent.c
SRCS += osdTime.cpp
SRCS += osdProcess.c
SRCS += osdNetIntf.c
SRCS += osdMessageQueue.c
SRC_DIRS += $(LIBCOM)/taskwd
INC += taskwd.h
+82
View File
@@ -0,0 +1,82 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author W. Eric Norum
* norume@aps.anl.gov
* 630 252 4793
*/
#include <new>
#define epicsExportSharedSymbols
#include "epicsMessageQueue.h"
epicsMessageQueue::epicsMessageQueue(unsigned int aCapacity,
unsigned int aMaxMessageSize)
: id ( epicsMessageQueueCreate(aCapacity, aMaxMessageSize) )
{
if (id == NULL)
throw std::bad_alloc ();
}
epicsMessageQueue::~epicsMessageQueue()
{
epicsMessageQueueDestroy(id);
}
int
epicsMessageQueue::trySend(void *message, unsigned int size)
{
return epicsMessageQueueTrySend(id, message, size);
}
int
epicsMessageQueue::send(void *message, unsigned int size)
{
return epicsMessageQueueSend(id, message, size);
}
int
epicsMessageQueue::send(void *message, unsigned int size, double timeout)
{
return epicsMessageQueueSendWithTimeout(id, message, size, timeout);
}
int
epicsMessageQueue::tryReceive(void *message )
{
return epicsMessageQueueTryReceive(id, message);
}
int
epicsMessageQueue::receive(void *message )
{
return epicsMessageQueueReceive(id, message);
}
int
epicsMessageQueue::receive(void *message, double timeout)
{
return epicsMessageQueueReceiveWithTimeout(id, message, timeout);
}
unsigned int
epicsMessageQueue::pending()
{
return epicsMessageQueuePending(id);
}
void
epicsMessageQueue::show(unsigned int level)
{
epicsMessageQueueShow(id, level);
}
@@ -22,23 +22,27 @@
#define epicsMessageQueueh
#include "epicsAssert.h"
#include "epicsEvent.h"
#include "epicsMutex.h"
#include "shareLib.h"
typedef struct epicsMessageQueueOSD *epicsMessageQueueId;
#ifdef __cplusplus
#include "locationException.h"
class epicsShareClass epicsMessageQueue {
public:
epicsMessageQueue ( unsigned int capacity,
unsigned int maximumMessageSize );
~epicsMessageQueue ();
bool send ( void *message, unsigned int messageSize );
bool send ( void *message, unsigned int messageSize, double timeout );
int trySend ( void *message, unsigned int messageSize );
int send ( void *message, unsigned int messageSize);
int send ( void *message, unsigned int messageSize, double timeout );
int tryReceive ( void *message );
int receive ( void *message );
int receive ( void *message, double timeout );
void show ( unsigned int level = 0 ) const;
unsigned int pending () const;
void show ( unsigned int level = 0 );
unsigned int pending ();
private: // Prevent compiler-generated member functions
// default constructor, copy constructor, assignment operator
@@ -46,32 +50,21 @@ private: // Prevent compiler-generated member functions
epicsMessageQueue(const epicsMessageQueue &);
epicsMessageQueue& operator=(const epicsMessageQueue &);
private:
int receive ( void *message, bool withTimeout, double timeout );
volatile char *inPtr;
volatile char *outPtr;
volatile bool full;
unsigned int capacity;
unsigned int maxMessageSize;
unsigned int slotSize;
unsigned long *buf;
char *firstMessageSlot;
char *lastMessageSlot;
epicsEvent queueEvent;
epicsMutex queueMutex;
epicsMessageQueueId id;
};
extern "C" {
#endif /*__cplusplus */
typedef void *epicsMessageQueueId;
epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
unsigned int capacity,
unsigned int maximumMessageSize);
epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy(
epicsMessageQueueId id);
epicsShareFunc int epicsShareAPI epicsMessageQueueTrySend(
epicsMessageQueueId id,
void *message,
unsigned int messageSize);
epicsShareFunc int epicsShareAPI epicsMessageQueueSend(
epicsMessageQueueId id,
void *message,
@@ -81,23 +74,26 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueSendWithTimeout(
void *message,
unsigned int messageSize,
double timeout);
epicsShareFunc int epicsShareAPI epicsMessageQueueTryReceive(
epicsMessageQueueId id,
void *message);
epicsShareFunc int epicsShareAPI epicsMessageQueueReceive(
epicsMessageQueueId id,
void *message,
unsigned int *messageSize);
void *message);
epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout(
epicsMessageQueueId id,
void *message,
unsigned int *messageSize,
double timeout);
epicsShareFunc int epicsShareAPI epicsMessageQueuePending(
epicsMessageQueueId id);
epicsShareFunc void epicsShareAPI epicsMessageQueueShow(
epicsMessageQueueId id,
unsigned int level);
int level);
#ifdef __cplusplus
}
#endif /*__cplusplus */
#include "osdMessageQueue.h"
#endif /* epicsMessageQueueh */
@@ -0,0 +1,328 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author W. Eric Norum
* norume@aps.anl.gov
* 630 252 4793
*/
/*
* Machines with 'traditional' memory access semantics could get by without
* the queueMutex for single producer/consumers, but architectures such as
* symmetric-multiprocessing machines with out-of-order writes require
* mutex locks to ensure correct operation. I decided to always use the
* mutex rather having two versions of this code.
*/
#define epicsExportSharedSymbols
#include "epicsMessageQueue.h"
#include <ellLib.h>
#include <epicsAssert.h>
#include <epicsEvent.h>
#include <epicsMutex.h>
#include <stdexcept>
# include <string.h>
/*
* Event cache
*/
struct eventNode {
ELLNODE link;
epicsEventId event;
};
/*
* List of threads waiting to send or receive a message
*/
struct threadNode {
ELLNODE link;
struct eventNode *evp;
void *buf;
unsigned int size;
volatile bool eventSent;
};
/*
* Message info
*/
struct epicsMessageQueueOSD {
ELLLIST sendQueue;
ELLLIST receiveQueue;
ELLLIST eventFreeList;
int numberOfSendersWaiting;
epicsMutexId mutex;
unsigned long capacity;
unsigned long maxMessageSize;
unsigned long *buf;
char *firstMessageSlot;
char *lastMessageSlot;
volatile char *inPtr;
volatile char *outPtr;
unsigned long slotSize;
bool full;
};
epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
unsigned int capacity,
unsigned int maxMessageSize)
{
epicsMessageQueueId pmsg;
unsigned int slotBytes, slotLongs;
assert(capacity != 0);
assert(maxMessageSize != 0);
pmsg = (epicsMessageQueueId)callocMustSucceed(1, sizeof(*pmsg), "epicsMessageQueueCreate");
pmsg->capacity = capacity;
pmsg->maxMessageSize = maxMessageSize;
slotLongs = 1 + ((maxMessageSize + sizeof(unsigned long) - 1) / sizeof(unsigned long));
slotBytes = slotLongs * sizeof(unsigned long);
pmsg->buf = (unsigned long *)callocMustSucceed(pmsg->capacity, slotBytes, "epicsMessageQueueCreate");
pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0];
pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs];
pmsg->slotSize = slotBytes;
pmsg->full = false;
pmsg->mutex = epicsMutexMustCreate();
ellInit(&pmsg->sendQueue);
ellInit(&pmsg->receiveQueue);
ellInit(&pmsg->eventFreeList);
return pmsg;
}
epicsShareFunc void epicsShareAPI
epicsMessageQueueDestroy(epicsMessageQueueId pmsg)
{
struct eventNode *evp;
while ((evp = (struct eventNode *)ellGet(&pmsg->eventFreeList)) != NULL) {
epicsEventDestroy(evp->event);
free(evp);
}
epicsMutexDestroy(pmsg->mutex);
free(pmsg->buf);
free(pmsg);
}
static struct eventNode *
getEventNode(epicsMessageQueueId pmsg)
{
struct eventNode *evp;
evp = (struct eventNode *)ellGet(&pmsg->eventFreeList);
if (evp == NULL) {
evp = (struct eventNode *)callocMustSucceed(1, sizeof(*evp), "epicsMessageQueueGetEventNode");
evp->event = epicsEventMustCreate(epicsEventEmpty);
}
return evp;
}
static int
mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bool haveTimeout, double timeout)
{
char *myInPtr, *myOutPtr, *nextPtr;
struct threadNode *pthr;
if(size > pmsg->maxMessageSize)
return -1;
epicsMutexLock(pmsg->mutex);
/*
* See if message can be sent
*/
if ((pmsg->numberOfSendersWaiting > 0) || (pmsg->full)) {
/*
* Return error if not waiting
*/
if (!wait) {
epicsMutexUnlock(pmsg->mutex);
return -1;
}
/*
* Wait
*/
struct threadNode threadNode;
threadNode.evp = getEventNode(pmsg);
threadNode.eventSent = false;
ellAdd(&pmsg->sendQueue, &threadNode.link);
pmsg->numberOfSendersWaiting++;
epicsMutexUnlock(pmsg->mutex);
if(haveTimeout)
epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
else
epicsEventWait(threadNode.evp->event);
epicsMutexLock(pmsg->mutex);
if(!threadNode.eventSent)
ellDelete(&pmsg->sendQueue, &threadNode.link);
pmsg->numberOfSendersWaiting--;
ellAdd(&pmsg->eventFreeList, &threadNode.evp->link);
if (pmsg->full) {
epicsMutexUnlock(pmsg->mutex);
return -1;
}
}
/*
* Copy message to waiting receiver
*/
if ((pthr = (struct threadNode *)ellGet(&pmsg->receiveQueue)) != NULL) {
memcpy(pthr->buf, message, size);
pthr->size = size;
pthr->eventSent = true;
epicsEventSignal(pthr->evp->event);
epicsMutexUnlock(pmsg->mutex);
return 0;
}
/*
* Copy to queue
*/
myInPtr = (char *)pmsg->inPtr;
if (myInPtr == pmsg->lastMessageSlot)
nextPtr = pmsg->firstMessageSlot;
else
nextPtr = myInPtr + pmsg->slotSize;
if (nextPtr == (char *)pmsg->outPtr)
pmsg->full = true;
*(volatile unsigned long *)myInPtr = size;
memcpy((unsigned long *)myInPtr + 1, message, size);
pmsg->inPtr = nextPtr;
epicsMutexUnlock(pmsg->mutex);
return 0;
}
epicsShareFunc int epicsShareAPI
epicsMessageQueueTrySend(epicsMessageQueueId pmsg, void *message, unsigned int size)
{
return mySend(pmsg, message, size, false, false, 0.0);
}
epicsShareFunc int epicsShareAPI
epicsMessageQueueSend(epicsMessageQueueId pmsg, void *message, unsigned int size)
{
return mySend(pmsg, message, size, true, false, 0.0);
}
epicsShareFunc int epicsShareAPI
epicsMessageQueueSendWithTimeout(epicsMessageQueueId pmsg, void *message, unsigned int size, double timeout)
{
return mySend(pmsg, message, size, true, true, timeout);
}
static int
myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, double timeout)
{
char *myOutPtr;
unsigned long l;
struct threadNode *pthr;
epicsMutexLock(pmsg->mutex);
/*
* If there's a message on the queue, copy it
*/
myOutPtr = (char *)pmsg->outPtr;
if ((myOutPtr != pmsg->inPtr) || pmsg->full) {
l = *(unsigned long *)myOutPtr;
memcpy(message, (unsigned long *)myOutPtr + 1, l);
if (myOutPtr == pmsg->lastMessageSlot)
pmsg->outPtr = pmsg->firstMessageSlot;
else
pmsg->outPtr += pmsg->slotSize;
pmsg->full = false;
/*
* Wake up the oldest task waiting to send
*/
if ((pthr = (struct threadNode *)ellGet(&pmsg->sendQueue)) != NULL) {
pthr->eventSent = true;
epicsEventSignal(pthr->evp->event);
}
epicsMutexUnlock(pmsg->mutex);
return l;
}
/*
* Return if not waiting
*/
if (!wait) {
epicsMutexUnlock(pmsg->mutex);
return -1;
}
/*
* Wait for message to arrive
*/
struct threadNode threadNode;
threadNode.evp = getEventNode(pmsg);
threadNode.buf = message;
threadNode.eventSent = false;
ellAdd(&pmsg->receiveQueue, &threadNode.link);
epicsMutexUnlock(pmsg->mutex);
if(haveTimeout)
epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
else
epicsEventWait(threadNode.evp->event);
epicsMutexLock(pmsg->mutex);
if(!threadNode.eventSent)
ellDelete(&pmsg->receiveQueue, &threadNode.link);
ellAdd(&pmsg->eventFreeList, &threadNode.evp->link);
epicsMutexUnlock(pmsg->mutex);
if(threadNode.eventSent)
return threadNode.size;
return -1;
}
epicsShareFunc int epicsShareAPI
epicsMessageQueueTryReceive(epicsMessageQueueId pmsg, void *message)
{
return myReceive(pmsg, message, false, false, 0.0);
}
epicsShareFunc int epicsShareAPI
epicsMessageQueueReceive(epicsMessageQueueId pmsg, void *message)
{
return myReceive(pmsg, message, true, false, 0.0);
}
epicsShareFunc int epicsShareAPI
epicsMessageQueueReceiveWithTimeout(epicsMessageQueueId pmsg, void *message, double timeout)
{
return myReceive(pmsg, message, true, true, timeout);
}
epicsShareFunc int epicsShareAPI
epicsMessageQueuePending(epicsMessageQueueId pmsg)
{
char *myInPtr, *myOutPtr;
int nmsg;
epicsMutexLock(pmsg->mutex);
myInPtr = (char *)pmsg->inPtr;
myOutPtr = (char *)pmsg->outPtr;
if (pmsg->full)
nmsg = pmsg->capacity;
else if (myInPtr >= myOutPtr)
nmsg = (myInPtr - myOutPtr) / pmsg->slotSize;
else
nmsg = pmsg->capacity - (myOutPtr - myInPtr) / pmsg->slotSize;
epicsMutexUnlock(pmsg->mutex);
return nmsg;
}
epicsShareFunc void epicsShareAPI
epicsMessageQueueShow(epicsMessageQueueId pmsg, int level)
{
printf("Message Queue Used:%d Slots:%lu", epicsMessageQueuePending(pmsg), pmsg->capacity);
if (level >= 1)
printf(" Maximum size:%lu", pmsg->maxMessageSize);
printf("\n");
}
-218
View File
@@ -1,218 +0,0 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author W. Eric Norum
* norume@aps.anl.gov
* 630 252 4793
*/
/*
* Interthread message passing
*
* Machines with 'traditional' memory access semantics could get by without
* the queueMutex for single producer/consumers, but architectures, such as
* symmetric-multiprocessing machines with out-of-order writes, require
* mutex locks to ensure correct operation. I decided to always use the
* mutex rather having two versions of this code.
*/
#include <stdexcept>
#include <string.h>
#define epicsExportSharedSymbols
#include <epicsAssert.h>
#include "epicsMessageQueue.h"
epicsMessageQueue::epicsMessageQueue(unsigned int aCapacity,
unsigned int aMaxMessageSize)
{
assert(aCapacity != 0);
assert(aMaxMessageSize != 0);
capacity = aCapacity;
maxMessageSize = aMaxMessageSize;
slotSize = (sizeof(unsigned long)
+ maxMessageSize + sizeof(unsigned long) - 1)
/ sizeof(unsigned long);
buf = new unsigned long[capacity * slotSize];
inPtr = outPtr = firstMessageSlot = (char *)&buf[0];
lastMessageSlot = (char *)&buf[(capacity - 1) * slotSize];
slotSize *= sizeof(unsigned long);
full = false;
}
epicsMessageQueue::~epicsMessageQueue()
{
delete buf;
}
bool
epicsMessageQueue::send(void *message, unsigned int size)
{
char *myInPtr, *myOutPtr, *nextPtr;
assert(size <= maxMessageSize);
queueMutex.lock();
if (full) {
queueMutex.unlock();
return false;
}
myInPtr = (char *)inPtr;
myOutPtr = (char *)outPtr;
if (myInPtr == lastMessageSlot)
nextPtr = firstMessageSlot;
else
nextPtr = myInPtr + slotSize;
if (nextPtr == myOutPtr)
full = true;
*(volatile unsigned long *)myInPtr = size;
memcpy((unsigned long *)myInPtr + 1, message, size);
this->inPtr = nextPtr;
queueMutex.unlock();
queueEvent.signal();
return true;
}
int
epicsMessageQueue::receive(void *message, bool withTimeout, double timeout)
{
char *myOutPtr;
unsigned long l;
queueMutex.lock();
myOutPtr = (char *)outPtr;
while ((myOutPtr == inPtr) && !full) {
queueMutex.unlock();
if (withTimeout) {
if (queueEvent.wait(timeout) == false)
return -1;
}
else {
queueEvent.wait();
}
queueMutex.lock();
myOutPtr = (char *)outPtr;
}
l = *(unsigned long *)myOutPtr;
memcpy(message, (unsigned long *)myOutPtr + 1, l);
if (myOutPtr == lastMessageSlot)
myOutPtr = firstMessageSlot;
else
myOutPtr = myOutPtr + slotSize;
this->outPtr = myOutPtr;
full = false;
queueMutex.unlock();
return l;
}
int
epicsMessageQueue::receive(void *message )
{
return this->receive(message, false, 0.0);
}
int
epicsMessageQueue::receive(void *message, double timeout)
{
return this->receive(message, true, timeout);
}
unsigned int
epicsMessageQueue::pending() const
{
char *myInPtr, *myOutPtr;
int nmsg;
myInPtr = (char *)inPtr;
myOutPtr = (char *)outPtr;
if (myInPtr >= myOutPtr)
nmsg = (myInPtr - myOutPtr) / slotSize;
else
nmsg = capacity - (myOutPtr - myInPtr) / slotSize;
if (full)
nmsg = capacity;
return nmsg;
}
void
epicsMessageQueue::show(unsigned int level) const
{
char *myInPtr, *myOutPtr;
int nmsg;
myInPtr = (char *)inPtr;
myOutPtr = (char *)outPtr;
if (myInPtr >= myOutPtr)
nmsg = (myInPtr - myOutPtr) / slotSize;
else
nmsg = capacity - (myOutPtr - myInPtr) / slotSize;
if (full)
nmsg = capacity;
printf("Message Queue Used:%d Slots:%d", nmsg, capacity);
if (level >= 1)
printf(" Maximum size:%u", maxMessageSize);
printf("\n");
}
epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
unsigned int capacity,
unsigned int maxMessageSize)
{
epicsMessageQueue *qid;
try {
qid = new epicsMessageQueue(capacity, maxMessageSize);
}
catch (...) {
return NULL;
}
return qid;
}
epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy(
epicsMessageQueueId id)
{
delete (epicsMessageQueue *)id;
}
epicsShareFunc int epicsShareAPI epicsMessageQueueSend(
epicsMessageQueueId id,
void *message,
unsigned int messageSize)
{
return ((epicsMessageQueue *)id)->send(message, messageSize);
}
epicsShareFunc int epicsShareAPI epicsMessageQueueReceive(
epicsMessageQueueId id,
void *message)
{
return ((epicsMessageQueue *)id)->receive(message);
}
epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout(
epicsMessageQueueId id,
void *message,
double timeout)
{
return ((epicsMessageQueue *)id)->receive(message, timeout);
}
epicsShareFunc int epicsShareAPI epicsMessageQueuePending(
epicsMessageQueueId id)
{
return ((epicsMessageQueue *)id)->pending();
}
epicsShareFunc void epicsShareAPI epicsMessageQueueShow(
epicsMessageQueueId id,
unsigned int level)
{
((epicsMessageQueue *)id)->show(level);
}
+83 -11
View File
@@ -23,6 +23,23 @@
const char *msg1 = "1234567890This is a very long message.";
/*
* In Numerical Recipes in C: The Art of Scientific Computing (William H.
* Press, Brian P. Flannery, Saul A. Teukolsky, William T. Vetterling; New
* York: Cambridge University Press, 1992 (2nd ed., p. 277)), the follow-
* ing comments are made:
* "If you want to generate a random integer between 1 and 10, you
* should always do it by using high-order bits, as in
* j=1+(int) (10.0*rand()/(RAND_MAX+1.0));
* and never by anything resembling
* j=1+(rand() % 10);
*/
static int
randBelow(int n)
{
return (int)((double)n*rand()/(RAND_MAX+1.0));
}
static void
receiver(void *arg)
{
@@ -43,7 +60,7 @@ receiver(void *arg)
if (expectmsg[sender-1] != msgNum)
printf("%s received %d '%.*s' -- expected %d\n", epicsThreadGetNameSelf(), len, len, cbuf, expectmsg[sender-1]);
expectmsg[sender-1] = msgNum + 1;
epicsThreadSleep(0.001 * (rand() % 20));
epicsThreadSleep(0.001 * (randBelow(20)));
}
else {
printf("%s received %d '%.*s'\n", epicsThreadGetNameSelf(), len, len, cbuf);
@@ -65,9 +82,9 @@ sender(void *arg)
for (;;) {
len = sprintf(cbuf, "%s -- %d.", epicsThreadGetNameSelf(), ++i);
while (q->send((void *)cbuf, len) == false)
epicsThreadSleep(0.005 * (rand() % 5));
epicsThreadSleep(0.005 * (rand() % 20));
while (q->trySend((void *)cbuf, len) < 0)
epicsThreadSleep(0.005 * (randBelow(5)));
epicsThreadSleep(0.005 * (randBelow(20)));
}
}
@@ -89,7 +106,7 @@ epicsMessageQueueTest()
i = 0;
used = 0;
assert(q1->pending() == 0);
while (q1->send((void *)msg1, i ) == true) {
while (q1->trySend((void *)msg1, i ) == 0) {
i++;
assert(q1->pending() == i);
printf("Should have %d used -- ", ++used);
@@ -108,14 +125,14 @@ epicsMessageQueueTest()
assert(q1->pending() == 2);
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
q1->send((void *)msg1, i++);
q1->trySend((void *)msg1, i++);
want++;
len = q1->receive(cbuf);
assert(q1->pending() == 2);
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
q1->send((void *)msg1, i++);
q1->trySend((void *)msg1, i++);
assert(q1->pending() == 3);
i = 3;
@@ -125,7 +142,62 @@ epicsMessageQueueTest()
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
}
printf("len:%d i:%d pending:%d\n", len, i, q1->pending());
assert(q1->pending() == 0);
/*
* Sender timeout
*/
i = 0;
used = 0;
assert(q1->pending() == 0);
while (q1->send((void *)msg1, i, 1.0 ) == 0) {
i++;
assert(q1->pending() == i);
printf("Should have %d used -- ", ++used);
q1->show();
}
assert(q1->pending() == 4);
want = 0;
len = q1->receive(cbuf);
assert(q1->pending() == 3);
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
want++;
len = q1->receive(cbuf);
assert(q1->pending() == 2);
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
q1->send((void *)msg1, i++, 1.0);
want++;
len = q1->receive(cbuf);
assert(q1->pending() == 2);
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
q1->send((void *)msg1, i++, 1.0);
assert(q1->pending() == 3);
i = 3;
while ((len = q1->receive(cbuf, 1.0)) >= 0) {
assert(q1->pending() == --i);
want++;
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
}
assert(q1->pending() == 0);
/*
* Receiver with timeout
*/
for (i = 0 ; i < 4 ; i++)
assert (q1->send((void *)msg1, i, 1.0) == 0);
assert(q1->pending() == 4);
for (i = 0 ; i < 4 ; i++)
assert (q1->receive((void *)cbuf, 1.0) == (int)i);
assert(q1->pending() == 0);
assert (q1->receive((void *)cbuf, 1.0) < 0);
assert(q1->pending() == 0);
/*
@@ -142,7 +214,7 @@ epicsMessageQueueTest()
case 3: printf ("Should send/receive 10 messages (sender pauses after sending).\n"); break;
}
for (i = 0 ; i < 10 ; i++) {
if (q1->send((void *)msg1, i) == false)
if (q1->trySend((void *)msg1, i) < 0)
break;
if (pass >= 3)
epicsThreadSleep(0.5);
@@ -154,7 +226,7 @@ epicsMessageQueueTest()
/*
* Single receiver, multiple sender tests
*/
printf("This test takes another 5 minutes to finish.\n");
printf("This test takes 5 minutes to run.\n");
printf("Test has succeeded if nothing appears between here....\n");
epicsThreadCreate("Sender 1", epicsThreadPriorityLow, epicsThreadStackMedium, sender, q1);
epicsThreadCreate("Sender 2", epicsThreadPriorityMedium, epicsThreadStackMedium, sender, q1);
@@ -167,6 +239,6 @@ epicsMessageQueueTest()
* Force out summaries
*/
printf("......and here.\n");
q1->send((void *)msg1, 0);
q1->trySend((void *)msg1, 0);
epicsThreadSleep(1.0);
}