diff --git a/src/libCom/osi/epicsMessageQueue.cpp b/src/libCom/osi/epicsMessageQueue.cpp index 6b0c9e34a..16e6b1df5 100644 --- a/src/libCom/osi/epicsMessageQueue.cpp +++ b/src/libCom/osi/epicsMessageQueue.cpp @@ -52,21 +52,21 @@ epicsMessageQueue::send(void *message, unsigned int size, double timeout) } int -epicsMessageQueue::tryReceive(void *message ) +epicsMessageQueue::tryReceive(void *message, unsigned int size ) { - return epicsMessageQueueTryReceive(id, message); + return epicsMessageQueueTryReceive(id, message, size); } int -epicsMessageQueue::receive(void *message ) +epicsMessageQueue::receive(void *message, unsigned int size ) { - return epicsMessageQueueReceive(id, message); + return epicsMessageQueueReceive(id, message, size); } int -epicsMessageQueue::receive(void *message, double timeout) +epicsMessageQueue::receive(void *message, unsigned int size, double timeout) { - return epicsMessageQueueReceiveWithTimeout(id, message, timeout); + return epicsMessageQueueReceiveWithTimeout(id, message, size, timeout); } unsigned int diff --git a/src/libCom/osi/epicsMessageQueue.h b/src/libCom/osi/epicsMessageQueue.h index 0c766a7b7..053cc60bd 100644 --- a/src/libCom/osi/epicsMessageQueue.h +++ b/src/libCom/osi/epicsMessageQueue.h @@ -38,9 +38,9 @@ public: int trySend ( void *message, unsigned int messageSize ); int send ( void *message, unsigned int messageSize); int send ( void *message, unsigned int messageSize, double timeout ); - int tryReceive ( void *message ); - int receive ( void *message ); - int receive ( void *message, double timeout ); + int tryReceive ( void *message, unsigned int size ); + int receive ( void *message, unsigned int size ); + int receive ( void *message, unsigned int size, double timeout ); void show ( unsigned int level = 0 ); unsigned int pending (); @@ -76,13 +76,16 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueSendWithTimeout( double timeout); epicsShareFunc int epicsShareAPI epicsMessageQueueTryReceive( epicsMessageQueueId id, - void *message); + void *message, + unsigned int size); epicsShareFunc int epicsShareAPI epicsMessageQueueReceive( epicsMessageQueueId id, - void *message); + void *message, + unsigned int size); epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( epicsMessageQueueId id, void *message, + unsigned int size, double timeout); epicsShareFunc int epicsShareAPI epicsMessageQueuePending( epicsMessageQueueId id); diff --git a/src/libCom/osi/os/RTEMS/osdMessageQueue.c b/src/libCom/osi/os/RTEMS/osdMessageQueue.c index 3c0fd92c5..eecfb01e0 100644 --- a/src/libCom/osi/os/RTEMS/osdMessageQueue.c +++ b/src/libCom/osi/os/RTEMS/osdMessageQueue.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "epicsMessageQueue.h" #include "errlog.h" @@ -34,7 +35,7 @@ epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(unsigned int capacity, unsigned int maximumMessageSize) { rtems_status_code sc; - rtems_id qid; + epicsMessageQueueId id = (epicsMessageQueueId)callocMustSucceed(1, sizeof(*id), "epicsMessageQueueCreate"); rtems_interrupt_level level; static char c1 = 'a'; static char c2 = 'a'; @@ -44,11 +45,13 @@ epicsMessageQueueCreate(unsigned int capacity, unsigned int maximumMessageSize) capacity, maximumMessageSize, RTEMS_FIFO|RTEMS_LOCAL, - &qid); + &id->id); if (sc != RTEMS_SUCCESSFUL) { errlogPrintf ("Can't create message queue: %s\n", rtems_status_text (sc)); return NULL; } + id->maxSize = maximumMessageSize; + id->localBuf = NULL; rtems_interrupt_disable (level); if (c1 == 'z') { if (c2 == 'z') { @@ -69,10 +72,10 @@ epicsMessageQueueCreate(unsigned int capacity, unsigned int maximumMessageSize) c1++; } rtems_interrupt_enable (level); - return (epicsMessageQueueId)qid; + return id; } -rtems_status_code rtems_message_queue_send_timeout( +static rtems_status_code rtems_message_queue_send_timeout( rtems_id id, void *buffer, rtems_unsigned32 size, @@ -123,7 +126,7 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueSend( void *message, unsigned int messageSize) { - if (rtems_message_queue_send_timeout((rtems_id)id, message, messageSize, RTEMS_NO_TIMEOUT) == RTEMS_SUCCESSFUL) + if (rtems_message_queue_send_timeout(id->id, message, messageSize, RTEMS_NO_TIMEOUT) == RTEMS_SUCCESSFUL) return 0; else return -1; @@ -136,7 +139,6 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueSendWithTimeout( double timeout) { rtems_interval delay; - rtems_unsigned32 wait; extern double rtemsTicksPerSecond_double; /* @@ -144,47 +146,67 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueSendWithTimeout( */ if (timeout <= 0.0) return epicsMessageQueueTrySend(id, message, messageSize); - wait = RTEMS_WAIT; delay = (int)(timeout * rtemsTicksPerSecond_double); if (delay == 0) delay++; - if (rtems_message_queue_send_timeout((rtems_id)id, message, messageSize, delay) == RTEMS_SUCCESSFUL) + if (rtems_message_queue_send_timeout(id->id, message, messageSize, delay) == RTEMS_SUCCESSFUL) return 0; else return -1; } +static int receiveMessage( + epicsMessageQueueId id, + void *buffer, + rtems_unsigned32 size, + rtems_unsigned32 wait, + rtems_interval delay) +{ + rtems_unsigned32 rsize; + rtems_status_code sc; + + if (size < id->maxSize) { + if (id->localBuf == NULL) { + id->localBuf = malloc(id->maxSize); + if (id->localBuf == NULL) + return -1; + } + rsize = receiveMessage(id, id->localBuf, id->maxSize, wait, delay); + if ((rsize < 0) || (rsize > size)) + return -1; + memcpy(buffer, id->localBuf, rsize); + } + else { + sc = rtems_message_queue_receive(id->id, buffer, &rsize, wait, delay); + if (sc != RTEMS_SUCCESSFUL) + return -1; + } + return rsize; +} + epicsShareFunc int epicsShareAPI epicsMessageQueueTryReceive( epicsMessageQueueId id, - void *message) + void *message, + unsigned int size) { - rtems_unsigned32 size; - - if (rtems_message_queue_receive((rtems_id)id, message, &size, RTEMS_NO_WAIT, 0) == RTEMS_SUCCESSFUL) - return size; - else - return -1; + return receiveMesssage(id, message, size, RTEMS_NO_WAIT, 0); } epicsShareFunc int epicsShareAPI epicsMessageQueueReceive( epicsMessageQueueId id, - void *message) + void *message, + unsigned int size) { - rtems_unsigned32 size; - - if (rtems_message_queue_receive((rtems_id)id, message, &size, RTEMS_WAIT, RTEMS_NO_TIMEOUT) == RTEMS_SUCCESSFUL) - return size; - else - return -1; + return receiveMesssage(id, message, size, RTEMS_WAIT, RTEMS_NO_TIMEOUT); } epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( epicsMessageQueueId id, void *message, + unsigned int size, double timeout) { rtems_interval delay; - rtems_unsigned32 size; rtems_unsigned32 wait; extern double rtemsTicksPerSecond_double; @@ -201,10 +223,7 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( if (delay == 0) delay++; } - if (rtems_message_queue_receive((rtems_id)id, message, &size, wait, delay) == RTEMS_SUCCESSFUL) - return size; - else - return -1; + return receiveMesssage(id, message, size, wait, delay); } epicsShareFunc int epicsShareAPI epicsMessageQueuePending( diff --git a/src/libCom/osi/os/RTEMS/osdMessageQueue.h b/src/libCom/osi/os/RTEMS/osdMessageQueue.h index 71dc47f87..8a4f7b24e 100644 --- a/src/libCom/osi/os/RTEMS/osdMessageQueue.h +++ b/src/libCom/osi/os/RTEMS/osdMessageQueue.h @@ -20,6 +20,12 @@ */ #include -#define epicsMessageQueueDestroy(q) (rtems_message_queue_delete((rtems_id)(q))) +struct epicsMessageQueueOSD { + rtems_id id; + unsigned int maxSize; + void *localBuf; -#define epicsMessageQueueTrySend(q,m,l) (rtems_message_queue_send((rtems_id)(q), (m), (l)) == RTEMS_SUCCESSFUL ? 0 : -1) +}; +#define epicsMessageQueueDestroy(q) (rtems_message_queue_delete((q)->id)) + +#define epicsMessageQueueTrySend(q,m,l) (rtems_message_queue_send((q)->id, (m), (l)) == RTEMS_SUCCESSFUL ? 0 : -1) diff --git a/src/libCom/osi/os/default/osdMessageQueue.cpp b/src/libCom/osi/os/default/osdMessageQueue.cpp index 582b26078..6f0dddc92 100644 --- a/src/libCom/osi/os/default/osdMessageQueue.cpp +++ b/src/libCom/osi/os/default/osdMessageQueue.cpp @@ -172,7 +172,8 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bo */ if ((pthr = reinterpret_cast < struct threadNode * > ( ellGet(&pmsg->receiveQueue) ) ) != NULL) { - memcpy(pthr->buf, message, size); + if(size <= pthr->size) + memcpy(pthr->buf, message, size); pthr->size = size; pthr->eventSent = true; epicsEventSignal(pthr->evp->event); @@ -216,7 +217,7 @@ epicsMessageQueueSendWithTimeout(epicsMessageQueueId pmsg, void *message, unsign } static int -myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, double timeout) +myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bool haveTimeout, double timeout) { char *myOutPtr; unsigned long l; @@ -228,8 +229,15 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, epicsMutexLock(pmsg->mutex); myOutPtr = (char *)pmsg->outPtr; if ((myOutPtr != pmsg->inPtr) || pmsg->full) { + int ret; l = *(unsigned long *)myOutPtr; - memcpy(message, (unsigned long *)myOutPtr + 1, l); + if (l <= size) { + memcpy(message, (unsigned long *)myOutPtr + 1, l); + ret = l; + } + else { + ret = -1; + } if (myOutPtr == pmsg->lastMessageSlot) pmsg->outPtr = pmsg->firstMessageSlot; else @@ -245,7 +253,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, epicsEventSignal(pthr->evp->event); } epicsMutexUnlock(pmsg->mutex); - return l; + return ret; } /* @@ -271,6 +279,7 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, struct threadNode threadNode; threadNode.evp = getEventNode(pmsg); threadNode.buf = message; + threadNode.size = size; threadNode.eventSent = false; ellAdd(&pmsg->receiveQueue, &threadNode.link); epicsMutexUnlock(pmsg->mutex); @@ -283,27 +292,27 @@ myReceive(epicsMessageQueueId pmsg, void *message, bool wait, bool haveTimeout, ellDelete(&pmsg->receiveQueue, &threadNode.link); ellAdd(&pmsg->eventFreeList, &threadNode.evp->link); epicsMutexUnlock(pmsg->mutex); - if(threadNode.eventSent) + if(threadNode.eventSent && (threadNode.size <= size)) return threadNode.size; return -1; } epicsShareFunc int epicsShareAPI -epicsMessageQueueTryReceive(epicsMessageQueueId pmsg, void *message) +epicsMessageQueueTryReceive(epicsMessageQueueId pmsg, void *message, unsigned int size) { - return myReceive(pmsg, message, false, false, 0.0); + return myReceive(pmsg, message, size, false, false, 0.0); } epicsShareFunc int epicsShareAPI -epicsMessageQueueReceive(epicsMessageQueueId pmsg, void *message) +epicsMessageQueueReceive(epicsMessageQueueId pmsg, void *message, unsigned int size) { - return myReceive(pmsg, message, true, false, 0.0); + return myReceive(pmsg, message, size, true, false, 0.0); } epicsShareFunc int epicsShareAPI -epicsMessageQueueReceiveWithTimeout(epicsMessageQueueId pmsg, void *message, double timeout) +epicsMessageQueueReceiveWithTimeout(epicsMessageQueueId pmsg, void *message, unsigned int size, double timeout) { - return myReceive(pmsg, message, true, true, timeout); + return myReceive(pmsg, message, size, true, true, timeout); } epicsShareFunc int epicsShareAPI diff --git a/src/libCom/osi/os/vxWorks/osdMessageQueue.cpp b/src/libCom/osi/os/vxWorks/osdMessageQueue.cpp index d541f9f05..697058093 100644 --- a/src/libCom/osi/os/vxWorks/osdMessageQueue.cpp +++ b/src/libCom/osi/os/vxWorks/osdMessageQueue.cpp @@ -21,25 +21,6 @@ extern "C" int sysClkRateGet(void); -epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate( - unsigned int capacity, - unsigned int maximumMessageSize) -{ - epicsMessageQueueId id = (epicsMessageQueueId)callocMustSucceed(1, sizeof(*id), "epicsMessageQueueCreate"); - if ((id->msgq = msgQCreate(capacity, maximumMessageSize, MSG_Q_FIFO)) == NULL) { - free(id); - return NULL; - } - id->nBytes = maximumMessageSize; - return id; -} - -epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy(epicsMessageQueueId id) -{ - msgQDelete(id->msgq); - free(id); -} - epicsShareFunc int epicsShareAPI epicsMessageQueueSendWithTimeout( epicsMessageQueueId id, void *message, @@ -54,12 +35,13 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueSendWithTimeout( ticks = (int)(timeout*sysClkRateGet()); if(ticks<=0) ticks = 1; } - return msgQSend(id->msgq, (char *)message, messageSize, ticks, MSG_PRI_NORMAL); + return msgQSend((MSG_Q_ID)id, (char *)message, messageSize, ticks, MSG_PRI_NORMAL); } epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( epicsMessageQueueId id, void *message, + unsigned int size, double timeout) { int ticks; @@ -70,5 +52,5 @@ epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout( ticks = (int)(timeout*sysClkRateGet()); if(ticks<=0) ticks = 1; } - return msgQReceive(id->msgq, (char *)message, id->nBytes, ticks); + return msgQReceive((MSG_Q_ID)id, (char *)message, size, ticks); } diff --git a/src/libCom/osi/os/vxWorks/osdMessageQueue.h b/src/libCom/osi/os/vxWorks/osdMessageQueue.h index 2e8604469..8260c2b46 100644 --- a/src/libCom/osi/os/vxWorks/osdMessageQueue.h +++ b/src/libCom/osi/os/vxWorks/osdMessageQueue.h @@ -21,16 +21,14 @@ #include #include -struct epicsMessageQueueOSD { - MSG_Q_ID msgq; - unsigned int nBytes; -}; +#define epicsMessageQueueCreate(c,s) ((epicsMessageQueueId)msgQCreate((c),(s),MSG_Q_FIFO)) +#define epicsMessageQueueDestroy(q) (msgQDestroy((MSG_Q_ID)(q))) -#define epicsMessageQueueTrySend(q,m,l) (msgQSend((q)->msgq, (char*)(m), (l), NO_WAIT, MSG_PRI_NORMAL)) -#define epicsMessageQueueSend(q,m,l) (msgQSend((q)->msgq, (char*)(m), (l), WAIT_FOREVER, MSG_PRI_NORMAL)) +#define epicsMessageQueueTrySend(q,m,l) (msgQSend((MSG_Q_ID)(q), (char*)(m), (l), NO_WAIT, MSG_PRI_NORMAL)) +#define epicsMessageQueueSend(q,m,l) (msgQSend((MSG_Q_ID)(q), (char*)(m), (l), WAIT_FOREVER, MSG_PRI_NORMAL)) -#define epicsMessageQueueTryReceive(q,m) (msgQReceive((q)->msgq, (char*)(m), (q)->nBytes, NO_WAIT)) -#define epicsMessageQueueReceive(q,m) (msgQReceive((q)->msgq, (char*)(m), (q)->nBytes, WAIT_FOREVER)) +#define epicsMessageQueueTryReceive(q,m,s) (msgQReceive((MSG_Q_ID)(q), (char*)(m), (s), NO_WAIT)) +#define epicsMessageQueueReceive(q,m,s) (msgQReceive((MSG_Q_ID)(q), (char*)(m), (s), WAIT_FOREVER)) -#define epicsMessageQueuePending(q) (msgQNumMsgs((q)->msgq)) -#define epicsMessageQueueShow(q,l) (msgQShow((q)->msgq,(l))) +#define epicsMessageQueuePending(q) (msgQNumMsgs((MSG_Q_ID)(q))) +#define epicsMessageQueueShow(q,l) (msgQShow((MSG_Q_ID)(q),(l))) diff --git a/src/libCom/test/epicsMessageQueueTest.cpp b/src/libCom/test/epicsMessageQueueTest.cpp index 0cb20af56..7bb9060f7 100644 --- a/src/libCom/test/epicsMessageQueueTest.cpp +++ b/src/libCom/test/epicsMessageQueueTest.cpp @@ -42,14 +42,15 @@ randBelow(int n) } extern "C" void -receiver0(void *arg) +badReceiver(void *arg) { epicsMessageQueue *q = (epicsMessageQueue *)arg; char cbuf[80]; - assert(q->receive(cbuf) == 5); - assert(q->receive(cbuf) == 0); - delete q; + cbuf[0] = '\0'; + assert((q->receive(cbuf, 1) == -1) && (cbuf[0] == '\0')); + epicsThreadSleep(5.0); + assert((q->receive(cbuf, 1) == -1) && (cbuf[0] == '\0')); } extern "C" void @@ -65,7 +66,7 @@ receiver(void *arg) expectmsg[sender-1] = 1; for (;;) { cbuf[0] = '\0'; - len = q->receive(cbuf); + len = q->receive(cbuf, sizeof cbuf); if ((sscanf(cbuf, "Sender %d -- %d", &sender, &msgNum) == 2) && (sender >= 1) && (sender <= 4)) { @@ -124,20 +125,20 @@ extern "C" void epicsMessageQueueTest() assert(q1->pending() == 4); want = 0; - len = q1->receive(cbuf); + len = q1->receive(cbuf, sizeof cbuf); assert(q1->pending() == 3); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); want++; - len = q1->receive(cbuf); + len = q1->receive(cbuf, sizeof cbuf); assert(q1->pending() == 2); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); q1->trySend((void *)msg1, i++); want++; - len = q1->receive(cbuf); + len = q1->receive(cbuf, sizeof cbuf); assert(q1->pending() == 2); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); @@ -145,7 +146,7 @@ extern "C" void epicsMessageQueueTest() assert(q1->pending() == 3); i = 3; - while ((len = q1->receive(cbuf, 1.0)) >= 0) { + while ((len = q1->receive(cbuf, sizeof cbuf, 1.0)) >= 0) { assert(q1->pending() == --i); want++; if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) @@ -166,20 +167,20 @@ extern "C" void epicsMessageQueueTest() assert(q1->pending() == 4); want = 0; - len = q1->receive(cbuf); + len = q1->receive(cbuf, sizeof cbuf); assert(q1->pending() == 3); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); want++; - len = q1->receive(cbuf); + len = q1->receive(cbuf, sizeof cbuf); assert(q1->pending() == 2); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); q1->send((void *)msg1, i++, 1.0); want++; - len = q1->receive(cbuf); + len = q1->receive(cbuf, sizeof cbuf); assert(q1->pending() == 2); if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf); @@ -187,7 +188,7 @@ extern "C" void epicsMessageQueueTest() assert(q1->pending() == 3); i = 3; - while ((len = q1->receive(cbuf, 1.0)) >= 0) { + while ((len = q1->receive(cbuf, sizeof cbuf, 1.0)) >= 0) { assert(q1->pending() == --i); want++; if ((len != want) || (strncmp(msg1, cbuf, len) != 0)) @@ -200,11 +201,19 @@ extern "C" void epicsMessageQueueTest() assert (q1->send((void *)msg1, i, 1.0) == 0); assert(q1->pending() == 4); for (i = 0 ; i < 4 ; i++) - assert (q1->receive((void *)cbuf, 1.0) == (int)i); + assert (q1->receive((void *)cbuf, sizeof cbuf, 1.0) == (int)i); assert(q1->pending() == 0); - assert (q1->receive((void *)cbuf, 1.0) < 0); + assert (q1->receive((void *)cbuf, sizeof cbuf, 1.0) < 0); assert(q1->pending() == 0); + printf("Single receiver with invalid size, single sender tests.\n"); + epicsThreadCreate("Bad Receiver", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), badReceiver, q1); + epicsThreadSleep(5.0); + assert (q1->send((void *)msg1, 10) == 0); /* Send with waiting receiver */ + epicsThreadSleep(2.0); + assert (q1->send((void *)msg1, 10) == 0); /* Send with no receiver */ + epicsThreadSleep(10.0); + printf("Single receiver, single sender tests.\n"); epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityHigh); epicsThreadCreate("Receiver one", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), receiver, q1); @@ -229,8 +238,7 @@ extern "C" void epicsMessageQueueTest() * Single receiver, multiple sender tests */ printf("Single receiver, multiple sender tests.\n"); - printf("This test takes 5 minutes to run.\n"); - printf("Test has succeeded if nothing appears between here....\n"); + printf("The following test takes 5 minutes to run and has succeeded\nif nothing appears between here....\n"); epicsThreadCreate("Sender 1", epicsThreadPriorityLow, epicsThreadGetStackSize(epicsThreadStackMedium), sender, q1); epicsThreadCreate("Sender 2", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), sender, q1); epicsThreadCreate("Sender 3", epicsThreadPriorityHigh, epicsThreadGetStackSize(epicsThreadStackMedium), sender, q1);