Add epicsMessageQueue support.

This commit is contained in:
W. Eric Norum
2003-02-21 18:40:31 +00:00
parent 337e45d36c
commit 4c980ea26f
6 changed files with 544 additions and 2 deletions
+5 -2
View File
@@ -28,10 +28,10 @@ SRC_DIRS += $(LIBCOM)/ring
#following needed for locating epicsRingPointer.h and epicsRingBytes.h
INC += epicsRingPointer.h
INC += epicsRingBytes.h
INC += epicsReadline.h
INC += epicsMessageQueue.h
SRCS += epicsRingPointer.cpp
SRCS += epicsRingBytes.c
SRCS += epicsReadline.c
SRCS += epicsMessageQueue.cpp
SRC_DIRS += $(LIBCOM)/calc
#following needed for locating postfixPvt.h and sCalcPostfixPvt.h
@@ -165,6 +165,7 @@ INC += epicsSignal.h
INC += osiProcess.h
INC += osiUnistd.h
INC += osiWireFormat.h
INC += epicsReadline.h
SRCS += epicsThread.cpp
SRCS += epicsMutex.cpp
@@ -179,6 +180,8 @@ SRCS += osdInterrupt.c
SRCS += osdPoolStatus.c
SRCS += osdSignal.cpp
SRCS += osdEnv.c
SRCS += epicsReadline.c
osdEnv_CFLAGS_WIN32= -U__STDC__
SRCS += osdThread.c
+229
View File
@@ -0,0 +1,229 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author W. Eric Norum
* norume@aps.anl.gov
* 630 252 4793
*/
/*
* Interthread message passing
*
* Machines with 'traditional' memory access semantics could get by without
* the queueMutex for single producer/consumers, but architectures, such as
* symmetric-multiprocessing machines with out-of-order writes, require
* mutex locks to ensure correct operation. I decided to always use the
* mutex rather having two versions of this code.
*/
#include "epicsMessageQueue.h"
#include <epicsAssert.h>
#include <memory.h>
#include <stdexcept>
epicsMessageQueue::epicsMessageQueue(unsigned int capacity,
unsigned int maxMessageSize)
{
assert(capacity != 0);
assert(maxMessageSize != 0);
this->capacity = capacity;
this->maxMessageSize = maxMessageSize;
slotSize = (sizeof(unsigned long)
+ maxMessageSize + sizeof(unsigned long) - 1)
/ sizeof(unsigned long);
buf = new unsigned long[capacity * slotSize];
inPtr = outPtr = firstMessageSlot = (char *)&buf[0];
lastMessageSlot = (char *)&buf[(capacity - 1) * slotSize];
slotSize *= sizeof(unsigned long);
full = false;
}
epicsMessageQueue::~epicsMessageQueue()
{
delete buf;
}
bool
epicsMessageQueue::send(void *message, unsigned int size)
{
char *inPtr, *outPtr, *nextPtr;
assert(size <= maxMessageSize);
queueMutex.lock();
if (full) {
queueMutex.unlock();
return false;
}
inPtr = (char *)this->inPtr;
outPtr = (char *)this->outPtr;
if (inPtr == lastMessageSlot)
nextPtr = firstMessageSlot;
else
nextPtr = inPtr + slotSize;
if (nextPtr == outPtr)
full = true;
*(volatile unsigned long *)inPtr = size;
memcpy((unsigned long *)inPtr + 1, message, size);
this->inPtr = nextPtr;
queueMutex.unlock();
queueEvent.signal();
return true;
}
bool
epicsMessageQueue::receive(void *message, unsigned int *size, bool withTimeout, double timeout)
{
char *outPtr;
unsigned long l;
queueMutex.lock();
outPtr = (char *)this->outPtr;
while ((outPtr == inPtr) && !full) {
queueMutex.unlock();
if (withTimeout) {
if (queueEvent.wait(timeout) == false)
return false;
}
else {
queueEvent.wait();
}
queueMutex.lock();
outPtr = (char *)this->outPtr;
}
*size = l = *(unsigned long *)outPtr;
memcpy(message, (unsigned long *)outPtr + 1, l);
if (outPtr == lastMessageSlot)
outPtr = firstMessageSlot;
else
outPtr = outPtr + slotSize;
this->outPtr = outPtr;
full = false;
queueMutex.unlock();
return true;
}
void
epicsMessageQueue::receive(void *message, unsigned int *size)
{
this->receive(message, size, false, 0.0);
}
bool
epicsMessageQueue::receive(void *message, unsigned int *size, double timeout)
{
return this->receive(message, size, true, timeout);
}
bool
epicsMessageQueue::isFull() const
{
return full;
}
bool
epicsMessageQueue::isEmpty()
{
bool empty;
queueMutex.lock();
empty = ((outPtr == inPtr) && !full);
queueMutex.unlock();
return empty;
}
void
epicsMessageQueue::show(unsigned int level) const
{
char *inPtr, *outPtr;
int nmsg;
inPtr = (char *)this->inPtr;
outPtr = (char *)this->outPtr;
if (inPtr >= outPtr)
nmsg = (inPtr - outPtr) / slotSize;
else
nmsg = capacity - (outPtr - inPtr) / slotSize;
if (full)
nmsg = capacity;
printf("Message Queue Used:%d Slots:%d", nmsg, capacity);
if (level >= 1)
printf(" Maximum size:%u", maxMessageSize);
printf("\n");
}
extern "C" {
epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
unsigned int capacity,
unsigned int maximumMessageSize)
{
epicsMessageQueue *id;
try {
id = new epicsMessageQueue::epicsMessageQueue(capacity, maximumMessageSize);
}
catch (...) {
return NULL;
}
return id;
}
epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy(
epicsMessageQueueId id)
{
delete (epicsMessageQueue *)id;
}
epicsShareFunc int epicsShareAPI epicsMessageQueueSend(
epicsMessageQueueId id,
void *message,
unsigned int messageSize)
{
return ((epicsMessageQueue *)id)->send(message, messageSize);
}
epicsShareFunc void epicsShareAPI epicsMessageQueueReceive(
epicsMessageQueueId id,
void *message,
unsigned int *messageSize)
{
((epicsMessageQueue *)id)->receive(message, messageSize);
}
epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout(
epicsMessageQueueId id,
void *message,
unsigned int *messageSize,
double timeout)
{
return ((epicsMessageQueue *)id)->receive(message, messageSize, timeout);
}
epicsShareFunc int epicsShareAPI epicsMessageQueueIsFull(
epicsMessageQueueId id)
{
return ((epicsMessageQueue *)id)->isFull();
}
epicsShareFunc int epicsShareAPI epicsMessageQueueIsEmpty(
epicsMessageQueueId id)
{
return ((epicsMessageQueue *)id)->isEmpty();
}
epicsShareFunc void epicsShareAPI epicsMessageQueueShow(
epicsMessageQueueId id,
unsigned int level)
{
((epicsMessageQueue *)id)->show(level);
}
} /* extern "C" */
+100
View File
@@ -0,0 +1,100 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author W. Eric Norum
* norume@aps.anl.gov
* 630 252 4793
*/
/*
* Interthread message passing
*/
#ifndef epicsMessageQueueh
#define epicsMessageQueueh
#include "epicsAssert.h"
#include "epicsEvent.h"
#include "epicsMutex.h"
#include "shareLib.h"
#ifdef __cplusplus
class epicsShareClass epicsMessageQueue {
public:
epicsMessageQueue ( unsigned int capacity,
unsigned int maximumMessageSize );
~epicsMessageQueue ();
bool send ( void *message, unsigned int messageSize );
void receive ( void *message, unsigned int *messageSize );
bool receive ( void *message, unsigned int *messageSize, double timeout );
void show ( unsigned int level = 0 ) const;
bool isFull () const;
bool isEmpty ();
private: // Prevent compiler-generated member functions
// default constructor, copy constructor, assignment operator
epicsMessageQueue();
epicsMessageQueue(const epicsMessageQueue &);
epicsMessageQueue& operator=(const epicsMessageQueue &);
private:
bool receive ( void *message, unsigned int *messageSize, bool withTimeout, double timeout );
volatile char *inPtr;
volatile char *outPtr;
volatile bool full;
unsigned int capacity;
unsigned int maxMessageSize;
unsigned int slotSize;
unsigned long *buf;
char *firstMessageSlot;
char *lastMessageSlot;
epicsEvent queueEvent;
epicsMutex queueMutex;
};
extern "C" {
#endif /*__cplusplus */
typedef void *epicsMessageQueueId;
epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
unsigned int capacity,
unsigned int maximumMessageSize);
epicsShareFunc void epicsShareAPI epicsMessageQueueDestroy(
epicsMessageQueueId id);
epicsShareFunc int epicsShareAPI epicsMessageQueueSend(
epicsMessageQueueId id,
void *message,
unsigned int messageSize);
epicsShareFunc void epicsShareAPI epicsMessageQueueReceive(
epicsMessageQueueId id,
void *message,
unsigned int *messageSize);
epicsShareFunc int epicsShareAPI epicsMessageQueueReceiveWithTimeout(
epicsMessageQueueId id,
void *message,
unsigned int *messageSize,
double timeout);
epicsShareFunc int epicsShareAPI epicsMessageQueueIsFull(
epicsMessageQueueId id);
epicsShareFunc int epicsShareAPI epicsMessageQueueIsEmpty(
epicsMessageQueueId id);
epicsShareFunc void epicsShareAPI epicsMessageQueueShow(
epicsMessageQueueId id,
unsigned int level);
#ifdef __cplusplus
}
#endif /*__cplusplus */
#endif /* epicsMessageQueueh */
+4
View File
@@ -63,6 +63,10 @@ epicsExceptionTestHost_SRCS += epicsExceptionTestMain.cpp epicsExceptionTest.cpp
PROD_HOST += epicsExceptionTestHost
OBJS_IOC += epicsExceptionTest
epicsMessageQueueTestHost_SRCS += epicsMessageQueueTestMain.cpp epicsMessageQueueTest.cpp
PROD_HOST += epicsMessageQueueTestHost
OBJS_IOC += epicsMessageQueueTest
#fdmgrTest_SRCS += fdmgrTest.c
#PROD_HOST += fdmgrTest
+182
View File
@@ -0,0 +1,182 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author W. Eric Norum
* norume@aps.anl.gov
* 630 252 4793
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <epicsMessageQueue.h>
#include <epicsThread.h>
#include <epicsAssert.h>
const char *msg1 = "1234567890This is a very long message.";
static void
receiver(void *arg)
{
epicsMessageQueue *q = (epicsMessageQueue *)arg;
char cbuf[80];
int expectmsg[4];
unsigned int len;
int sender, msgNum;
for (sender = 1 ; sender <= 4 ; sender++)
expectmsg[sender-1] = 1;
for (;;) {
cbuf[0] = '\0';
q->receive(cbuf, &len);
if ((sscanf(cbuf, "Sender %d -- %d", &sender, &msgNum) == 2)
&& (sender >= 1)
&& (sender <= 4)) {
if (expectmsg[sender-1] != msgNum)
printf("%s received %d '%.*s' -- expected %d\n", epicsThreadGetNameSelf(), len, len, cbuf, expectmsg[sender-1]);
expectmsg[sender-1] = msgNum + 1;
epicsThreadSleep(0.001 * (random() % 20));
}
else {
printf("%s received %d '%.*s'\n", epicsThreadGetNameSelf(), len, len, cbuf);
for (sender = 1 ; sender <= 4 ; sender++) {
if (expectmsg[sender-1] > 1)
printf("Sender %d -- %d messages\n", sender, expectmsg[sender-1]-1);
}
}
}
}
static void
sender(void *arg)
{
epicsMessageQueue *q = (epicsMessageQueue *)arg;
char cbuf[80];
unsigned int len;
int i = 0;
for (;;) {
len = sprintf(cbuf, "%s -- %d.", epicsThreadGetNameSelf(), ++i);
while (q->send((void *)cbuf, len) == false)
epicsThreadSleep(0.005 * (random() % 5));
epicsThreadSleep(0.005 * (random() % 20));
}
}
void
epicsMessageQueueTest()
{
int i;
char cbuf[80];
unsigned int len;
int pass;
int used;
unsigned int want;
epicsMessageQueue *q1 = new epicsMessageQueue(4, 20);
/*
* Simple single-thread tests
*/
i = 0;
used = 0;
assert(q1->isEmpty());
assert(!q1->isFull());
while (q1->send((void *)msg1, i ) == true) {
i++;
printf("Should have %d used -- ", ++used);
assert(!q1->isEmpty());
q1->show();
}
assert(!q1->isEmpty());
assert(q1->isFull());
want = 0;
q1->receive(cbuf, &len);
assert(!q1->isEmpty());
assert(!q1->isFull());
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
printf("Should have %d used -- ", --used);
q1->show();
want++;
q1->receive(cbuf, &len);
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
printf("Should have %d used -- ", --used);
q1->show();
q1->send((void *)msg1, i++);
printf("Should have %d used -- ", ++used);
q1->show();
want++;
q1->receive(cbuf, &len);
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
printf("Should have %d used -- ", --used);
q1->show();
q1->send((void *)msg1, i++);
printf("Should have %d used -- ", ++used);
q1->show();
while (q1->receive(cbuf, &len, 1.0) == true) {
printf("Should have %d used -- ", --used);
q1->show();
want++;
if ((len != want) || (strncmp(msg1, cbuf, len) != 0))
printf("wanted:%d '%.*s' got:%d '%.*s'\n", want, want, msg1, len, len, cbuf);
}
assert(q1->isEmpty());
assert(!q1->isFull());
/*
* Single receiver, single sender tests
*/
epicsThreadCreate("Receiver one", epicsThreadPriorityMedium, epicsThreadStackMedium, receiver, q1);
for (pass = 1 ; pass <= 3 ; pass++) {
epicsThreadSetPriority(epicsThreadGetIdSelf(), pass == 1 ?
epicsThreadPriorityHigh :
epicsThreadPriorityLow);
switch (pass) {
case 1: printf ("Should send/receive only 4 messages (sender priority > receiver priority).\n"); break;
case 2: printf ("Should send/receive 4 to 10 messages (depends on how host handles thread priorities).\n"); break;
case 3: printf ("Should send/receive 10 messages (sender pauses after sending).\n"); break;
}
for (i = 0 ; i < 10 ; i++) {
if (q1->send((void *)msg1, i) == false)
break;
if (pass >= 3)
epicsThreadSleep(0.5);
}
printf ("Sent %d messages.\n", i);
epicsThreadSleep(1.0);
}
/*
* Single receiver, multiple sender tests
*/
printf("This test takes another 5 minutes to finish.\n");
printf("Test has succeeded if nothing appears between here....\n");
epicsThreadCreate("Sender 1", epicsThreadPriorityLow, epicsThreadStackMedium, sender, q1);
epicsThreadCreate("Sender 2", epicsThreadPriorityMedium, epicsThreadStackMedium, sender, q1);
epicsThreadCreate("Sender 3", epicsThreadPriorityHigh, epicsThreadStackMedium, sender, q1);
epicsThreadCreate("Sender 4", epicsThreadPriorityHigh, epicsThreadStackMedium, sender, q1);
epicsThreadSleep(300.0);
/*
* Force out summaries
*/
printf("......and here.\n");
q1->send((void *)msg1, 0);
epicsThreadSleep(1.0);
}
@@ -0,0 +1,24 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author W. Eric Norum
* norume@aps.anl.gov
* 630 252 4793
*/
void epicsMessageQueueTest ( void );
int main ( int /* argc */, char /* *argv[] */ )
{
epicsMessageQueueTest ();
return 0;
}