diff --git a/src/libCom/Makefile b/src/libCom/Makefile index c34f4971f..a4c7ecd86 100644 --- a/src/libCom/Makefile +++ b/src/libCom/Makefile @@ -28,10 +28,10 @@ SRC_DIRS += $(LIBCOM)/ring #following needed for locating epicsRingPointer.h and epicsRingBytes.h INC += epicsRingPointer.h INC += epicsRingBytes.h -INC += epicsReadline.h +INC += epicsMessageQueue.h SRCS += epicsRingPointer.cpp SRCS += epicsRingBytes.c -SRCS += epicsReadline.c +SRCS += epicsMessageQueue.cpp SRC_DIRS += $(LIBCOM)/calc #following needed for locating postfixPvt.h and sCalcPostfixPvt.h @@ -165,6 +165,7 @@ INC += epicsSignal.h INC += osiProcess.h INC += osiUnistd.h INC += osiWireFormat.h +INC += epicsReadline.h SRCS += epicsThread.cpp SRCS += epicsMutex.cpp @@ -179,6 +180,8 @@ SRCS += osdInterrupt.c SRCS += osdPoolStatus.c SRCS += osdSignal.cpp SRCS += osdEnv.c +SRCS += epicsReadline.c + osdEnv_CFLAGS_WIN32= -U__STDC__ SRCS += osdThread.c diff --git a/src/libCom/ring/epicsMessageQueue.cpp b/src/libCom/ring/epicsMessageQueue.cpp new file mode 100644 index 000000000..1c2215a71 --- /dev/null +++ b/src/libCom/ring/epicsMessageQueue.cpp @@ -0,0 +1,229 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * Author W. Eric Norum + * norume@aps.anl.gov + * 630 252 4793 + */ + +/* + * Interthread message passing + * + * Machines with 'traditional' memory access semantics could get by without + * the queueMutex for single producer/consumers, but architectures, such as + * symmetric-multiprocessing machines with out-of-order writes, require + * mutex locks to ensure correct operation. I decided to always use the + * mutex rather having two versions of this code. + */ +#include "epicsMessageQueue.h" +#include +#include +#include + +epicsMessageQueue::epicsMessageQueue(unsigned int capacity, + unsigned int maxMessageSize) +{ + assert(capacity != 0); + assert(maxMessageSize != 0); + this->capacity = capacity; + this->maxMessageSize = maxMessageSize; + slotSize = (sizeof(unsigned long) + + maxMessageSize + sizeof(unsigned long) - 1) + / sizeof(unsigned long); + buf = new unsigned long[capacity * slotSize]; + inPtr = outPtr = firstMessageSlot = (char *)&buf[0]; + lastMessageSlot = (char *)&buf[(capacity - 1) * slotSize]; + slotSize *= sizeof(unsigned long); + full = false; +} + +epicsMessageQueue::~epicsMessageQueue() +{ + delete buf; +} + +bool +epicsMessageQueue::send(void *message, unsigned int size) +{ + char *inPtr, *outPtr, *nextPtr; + + assert(size <= maxMessageSize); + queueMutex.lock(); + if (full) { + queueMutex.unlock(); + return false; + } + inPtr = (char *)this->inPtr; + outPtr = (char *)this->outPtr; + if (inPtr == lastMessageSlot) + nextPtr = firstMessageSlot; + else + nextPtr = inPtr + slotSize; + if (nextPtr == outPtr) + full = true; + *(volatile unsigned long *)inPtr = size; + memcpy((unsigned long *)inPtr + 1, message, size); + this->inPtr = nextPtr; + queueMutex.unlock(); + queueEvent.signal(); + return true; +} + +bool +epicsMessageQueue::receive(void *message, unsigned int *size, bool withTimeout, double timeout) +{ + char *outPtr; + unsigned long l; + + queueMutex.lock(); + outPtr = (char *)this->outPtr; + while ((outPtr == inPtr) && !full) { + queueMutex.unlock(); + if (withTimeout) { + if (queueEvent.wait(timeout) == false) + return false; + } + else { + queueEvent.wait(); + } + queueMutex.lock(); + outPtr = (char *)this->outPtr; + } + *size = l = *(unsigned long *)outPtr; + memcpy(message, (unsigned long *)outPtr + 1, l); + if (outPtr == lastMessageSlot) + outPtr = firstMessageSlot; + else + outPtr = outPtr + slotSize; + this->outPtr = outPtr; + full = false; + queueMutex.unlock(); + return true; +} + +void +epicsMessageQueue::receive(void *message, unsigned int *size) +{ + this->receive(message, size, false, 0.0); +} + +bool +epicsMessageQueue::receive(void *message, unsigned int *size, double timeout) +{ + return this->receive(message, size, true, timeout); +} + +bool +epicsMessageQueue::isFull() const +{ + return full; +} + +bool +epicsMessageQueue::isEmpty() +{ + bool empty; + + queueMutex.lock(); + empty = ((outPtr == inPtr) && !full); + queueMutex.unlock(); + return empty; +} + +void +epicsMessageQueue::show(unsigned int level) const +{ + char *inPtr, *outPtr; + int nmsg; + + inPtr = (char *)this->inPtr; + outPtr = (char *)this->outPtr; + if (inPtr >= outPtr) + nmsg = (inPtr - outPtr) / slotSize; + else + nmsg = capacity - (outPtr - inPtr) / slotSize; + if (full) + nmsg = capacity; + printf("Message Queue Used:%d Slots:%d", nmsg, capacity); + if (level >= 1) + printf(" Maximum size:%u", maxMessageSize); + printf("\n"); +} + +extern "C" { + +epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( + unsigned int capacity, + unsigned int maximumMessageSize) +{ + epicsMessageQueue *id; + + try { + id = new epicsMessageQueue::epicsMessageQueue(capacity, maximumMessageSize); + } + catch (...) { + return NULL; + } + return id; +} + +epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy( + epicsMessageQueueId id) +{ + delete (epicsMessageQueue *)id; +} + +epicsShareFunc int epicsShareAPI epicsMessageQueueSend( + epicsMessageQueueId id, + void *message, + unsigned int messageSize) +{ + return ((epicsMessageQueue *)id)->send(message, messageSize); +} + +epicsShareFunc void epicsShareAPI epicsMessageQueueReceive( + epicsMessageQueueId id, + void *message, + unsigned int *messageSize) +{ + ((epicsMessageQueue *)id)->receive(message, messageSize); +} + +epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( + epicsMessageQueueId id, + void *message, + unsigned int *messageSize, + double timeout) +{ + return ((epicsMessageQueue *)id)->receive(message, messageSize, timeout); +} + +epicsShareFunc int epicsShareAPI epicsMessageQueueIsFull( + epicsMessageQueueId id) +{ + return ((epicsMessageQueue *)id)->isFull(); +} + +epicsShareFunc int epicsShareAPI epicsMessageQueueIsEmpty( + epicsMessageQueueId id) +{ + return ((epicsMessageQueue *)id)->isEmpty(); +} + +epicsShareFunc void epicsShareAPI epicsMessageQueueShow( + epicsMessageQueueId id, + unsigned int level) +{ + ((epicsMessageQueue *)id)->show(level); +} + +} /* extern "C" */ diff --git a/src/libCom/ring/epicsMessageQueue.h b/src/libCom/ring/epicsMessageQueue.h new file mode 100644 index 000000000..0515f098e --- /dev/null +++ b/src/libCom/ring/epicsMessageQueue.h @@ -0,0 +1,100 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * Author W. Eric Norum + * norume@aps.anl.gov + * 630 252 4793 + */ + +/* + * Interthread message passing + */ +#ifndef epicsMessageQueueh +#define epicsMessageQueueh + +#include "epicsAssert.h" +#include "epicsEvent.h" +#include "epicsMutex.h" +#include "shareLib.h" + +#ifdef __cplusplus + +class epicsShareClass epicsMessageQueue { +public: + epicsMessageQueue ( unsigned int capacity, + unsigned int maximumMessageSize ); + ~epicsMessageQueue (); + bool send ( void *message, unsigned int messageSize ); + void receive ( void *message, unsigned int *messageSize ); + bool receive ( void *message, unsigned int *messageSize, double timeout ); + void show ( unsigned int level = 0 ) const; + bool isFull () const; + bool isEmpty (); + +private: // Prevent compiler-generated member functions + // default constructor, copy constructor, assignment operator + epicsMessageQueue(); + epicsMessageQueue(const epicsMessageQueue &); + epicsMessageQueue& operator=(const epicsMessageQueue &); + +private: + bool receive ( void *message, unsigned int *messageSize, bool withTimeout, double timeout ); + + volatile char *inPtr; + volatile char *outPtr; + volatile bool full; + unsigned int capacity; + unsigned int maxMessageSize; + unsigned int slotSize; + unsigned long *buf; + char *firstMessageSlot; + char *lastMessageSlot; + epicsEvent queueEvent; + epicsMutex queueMutex; +}; + +extern "C" { +#endif /*__cplusplus */ + +typedef void *epicsMessageQueueId; + +epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( + unsigned int capacity, + unsigned int maximumMessageSize); +epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy( + epicsMessageQueueId id); +epicsShareFunc int epicsShareAPI epicsMessageQueueSend( + epicsMessageQueueId id, + void *message, + unsigned int messageSize); +epicsShareFunc void epicsShareAPI epicsMessageQueueReceive( + epicsMessageQueueId id, + void *message, + unsigned int *messageSize); +epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( + epicsMessageQueueId id, + void *message, + unsigned int *messageSize, + double timeout); +epicsShareFunc int epicsShareAPI epicsMessageQueueIsFull( + epicsMessageQueueId id); +epicsShareFunc int epicsShareAPI epicsMessageQueueIsEmpty( + epicsMessageQueueId id); +epicsShareFunc void epicsShareAPI epicsMessageQueueShow( + epicsMessageQueueId id, + unsigned int level); + +#ifdef __cplusplus +} +#endif /*__cplusplus */ + +#endif /* epicsMessageQueueh */ diff --git a/src/libCom/test/Makefile b/src/libCom/test/Makefile index 7209a022c..a4b944c5d 100644 --- a/src/libCom/test/Makefile +++ b/src/libCom/test/Makefile @@ -63,6 +63,10 @@ epicsExceptionTestHost_SRCS += epicsExceptionTestMain.cpp epicsExceptionTest.cpp PROD_HOST += epicsExceptionTestHost OBJS_IOC += epicsExceptionTest +epicsMessageQueueTestHost_SRCS += epicsMessageQueueTestMain.cpp epicsMessageQueueTest.cpp +PROD_HOST += epicsMessageQueueTestHost +OBJS_IOC += epicsMessageQueueTest + #fdmgrTest_SRCS += fdmgrTest.c #PROD_HOST += fdmgrTest diff --git a/src/libCom/test/epicsMessageQueueTest.cpp b/src/libCom/test/epicsMessageQueueTest.cpp new file mode 100644 index 000000000..e7bff9770 --- /dev/null +++ b/src/libCom/test/epicsMessageQueueTest.cpp @@ -0,0 +1,182 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * Author W. Eric Norum + * norume@aps.anl.gov + * 630 252 4793 + */ +#include +#include +#include +#include +#include +#include + +const char *msg1 = "1234567890This is a very long message."; + +static void +receiver(void *arg) +{ + epicsMessageQueue *q = (epicsMessageQueue *)arg; + char cbuf[80]; + int expectmsg[4]; + unsigned int len; + int sender, msgNum; + + for (sender = 1 ; sender <= 4 ; sender++) + expectmsg[sender-1] = 1; + for (;;) { + cbuf[0] = '\0'; + q->receive(cbuf, &len); + if ((sscanf(cbuf, "Sender %d -- %d", &sender, &msgNum) == 2) + && (sender >= 1) + && (sender <= 4)) { + if (expectmsg[sender-1] != msgNum) + printf("%s received %d '%.*s' -- expected %d\n", epicsThreadGetNameSelf(), len, len, cbuf, expectmsg[sender-1]); + expectmsg[sender-1] = msgNum + 1; + epicsThreadSleep(0.001 * (random() % 20)); + } + else { + printf("%s received %d '%.*s'\n", epicsThreadGetNameSelf(), len, len, cbuf); + for (sender = 1 ; sender <= 4 ; sender++) { + if (expectmsg[sender-1] > 1) + printf("Sender %d -- %d messages\n", sender, expectmsg[sender-1]-1); + } + } + } +} + +static void +sender(void *arg) +{ + epicsMessageQueue *q = (epicsMessageQueue *)arg; + char cbuf[80]; + unsigned int len; + int i = 0; + + for (;;) { + len = sprintf(cbuf, "%s -- %d.", epicsThreadGetNameSelf(), ++i); + while (q->send((void *)cbuf, len) == false) + epicsThreadSleep(0.005 * (random() % 5)); + epicsThreadSleep(0.005 * (random() % 20)); + } +} + +void +epicsMessageQueueTest() +{ + int i; + char cbuf[80]; + unsigned int len; + int pass; + int used; + unsigned int want; + + epicsMessageQueue *q1 = new epicsMessageQueue(4, 20); + + /* + * Simple single-thread tests + */ + i = 0; + used = 0; + assert(q1->isEmpty()); + assert(!q1->isFull()); + while (q1->send((void *)msg1, i ) == true) { + i++; + printf("Should have %d used -- ", ++used); + assert(!q1->isEmpty()); + q1->show(); + } + assert(!q1->isEmpty()); + assert(q1->isFull()); + + want = 0; + q1->receive(cbuf, &len); + assert(!q1->isEmpty()); + assert(!q1->isFull()); + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + printf("Should have %d used -- ", --used); + q1->show(); + + want++; + q1->receive(cbuf, &len); + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + printf("Should have %d used -- ", --used); + q1->show(); + q1->send((void *)msg1, i++); + printf("Should have %d used -- ", ++used); + q1->show(); + + want++; + q1->receive(cbuf, &len); + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + printf("Should have %d used -- ", --used); + q1->show(); + q1->send((void *)msg1, i++); + printf("Should have %d used -- ", ++used); + q1->show(); + + while (q1->receive(cbuf, &len, 1.0) == true) { + printf("Should have %d used -- ", --used); + q1->show(); + want++; + if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) + printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); + } + assert(q1->isEmpty()); + assert(!q1->isFull()); + + /* + * Single receiver, single sender tests + */ + epicsThreadCreate("Receiver one", epicsThreadPriorityMedium, epicsThreadStackMedium, receiver, q1); + for (pass = 1 ; pass <= 3 ; pass++) { + epicsThreadSetPriority(epicsThreadGetIdSelf(), pass == 1 ? + epicsThreadPriorityHigh : + epicsThreadPriorityLow); + switch (pass) { + case 1: printf ("Should send/receive only 4 messages (sender priority > receiver priority).\n"); break; + case 2: printf ("Should send/receive 4 to 10 messages (depends on how host handles thread priorities).\n"); break; + case 3: printf ("Should send/receive 10 messages (sender pauses after sending).\n"); break; + } + for (i = 0 ; i < 10 ; i++) { + if (q1->send((void *)msg1, i) == false) + break; + if (pass >= 3) + epicsThreadSleep(0.5); + } + printf ("Sent %d messages.\n", i); + epicsThreadSleep(1.0); + } + + /* + * Single receiver, multiple sender tests + */ + printf("This test takes another 5 minutes to finish.\n"); + printf("Test has succeeded if nothing appears between here....\n"); + epicsThreadCreate("Sender 1", epicsThreadPriorityLow, epicsThreadStackMedium, sender, q1); + epicsThreadCreate("Sender 2", epicsThreadPriorityMedium, epicsThreadStackMedium, sender, q1); + epicsThreadCreate("Sender 3", epicsThreadPriorityHigh, epicsThreadStackMedium, sender, q1); + epicsThreadCreate("Sender 4", epicsThreadPriorityHigh, epicsThreadStackMedium, sender, q1); + + epicsThreadSleep(300.0); + + /* + * Force out summaries + */ + printf("......and here.\n"); + q1->send((void *)msg1, 0); + epicsThreadSleep(1.0); +} diff --git a/src/libCom/test/epicsMessageQueueTestMain.cpp b/src/libCom/test/epicsMessageQueueTestMain.cpp new file mode 100644 index 000000000..1ca36886f --- /dev/null +++ b/src/libCom/test/epicsMessageQueueTestMain.cpp @@ -0,0 +1,24 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * Author W. Eric Norum + * norume@aps.anl.gov + * 630 252 4793 + */ + +void epicsMessageQueueTest ( void ); + +int main ( int /* argc */, char /* *argv[] */ ) +{ + epicsMessageQueueTest (); + return 0; +}