default osdMessageQueue avoid cantProceed

This commit is contained in:
Michael Davidsaver
2011-08-26 16:33:58 -07:00
committed by Michael Davidsaver
parent 34fc8f8251
commit e41bd3994e

View File

@@ -74,24 +74,46 @@ epicsShareFunc epicsMessageQueueId epicsShareAPI epicsMessageQueueCreate(
epicsMessageQueueId pmsg;
unsigned int slotBytes, slotLongs;
assert(capacity != 0);
pmsg = (epicsMessageQueueId)callocMustSucceed(1, sizeof(*pmsg), "epicsMessageQueueCreate");
if(capacity == 0)
return NULL;
pmsg = (epicsMessageQueueId)calloc(1, sizeof(*pmsg));
if(!pmsg)
return NULL;
pmsg->capacity = capacity;
pmsg->maxMessageSize = maxMessageSize;
slotLongs = 1 + ((maxMessageSize + sizeof(unsigned long) - 1) / sizeof(unsigned long));
slotBytes = slotLongs * sizeof(unsigned long);
pmsg->buf = (unsigned long *)callocMustSucceed(pmsg->capacity, slotBytes, "epicsMessageQueueCreate");
pmsg->mutex = epicsMutexCreate();
pmsg->buf = (unsigned long*)calloc(pmsg->capacity, slotBytes);
if(!pmsg->buf || !pmsg->mutex) {
if(pmsg->mutex)
epicsMutexDestroy(pmsg->mutex);
free(pmsg->buf);
free(pmsg);
return NULL;
}
pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0];
pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs];
pmsg->full = false;
pmsg->slotSize = slotBytes;
pmsg->mutex = epicsMutexMustCreate();
ellInit(&pmsg->sendQueue);
ellInit(&pmsg->receiveQueue);
ellInit(&pmsg->eventFreeList);
return pmsg;
}
static void
freeEventNode(struct eventNode *enode)
{
epicsEventDestroy(enode->event);
free(enode);
}
epicsShareFunc void epicsShareAPI
epicsMessageQueueDestroy(epicsMessageQueueId pmsg)
{
@@ -99,8 +121,7 @@ epicsMessageQueueDestroy(epicsMessageQueueId pmsg)
while ((evp = reinterpret_cast < struct eventNode * >
( ellGet(&pmsg->eventFreeList) ) ) != NULL) {
epicsEventDestroy(evp->event);
free(evp);
freeEventNode(evp);
}
epicsMutexDestroy(pmsg->mutex);
free(pmsg->buf);
@@ -114,9 +135,15 @@ getEventNode(epicsMessageQueueId pmsg)
evp = reinterpret_cast < struct eventNode * > ( ellGet(&pmsg->eventFreeList) );
if (evp == NULL) {
evp = (struct eventNode *) callocMustSucceed(1, sizeof(*evp),
"epicsMessageQueueGetEventNode");
evp->event = epicsEventMustCreate(epicsEventEmpty);
epicsEventId eid = epicsEventCreate(epicsEventEmpty);
evp = (struct eventNode *) calloc(1, sizeof(*evp));
if(!evp || !eid) {
free(evp);
if(eid)
epicsEventDestroy(eid);
return NULL;
}
evp->event = eid;
}
return evp;
}
@@ -133,7 +160,9 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bo
/*
* See if message can be sent
*/
epicsMutexLock(pmsg->mutex);
if(epicsMutexLock(pmsg->mutex)!=epicsMutexLockOK)
return -1;
if ((pmsg->numberOfSendersWaiting > 0)
|| (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL))) {
/*
@@ -150,18 +179,32 @@ mySend(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait, bo
struct threadNode threadNode;
threadNode.evp = getEventNode(pmsg);
threadNode.eventSent = false;
if (!threadNode.evp) {
epicsMutexUnlock(pmsg->mutex);
return -1;
}
ellAdd(&pmsg->sendQueue, &threadNode.link);
pmsg->numberOfSendersWaiting++;
epicsMutexUnlock(pmsg->mutex);
if(haveTimeout)
epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
else
epicsEventWait(threadNode.evp->event);
epicsMutexLock(pmsg->mutex);
if(epicsMutexLock(pmsg->mutex)!=epicsMutexLockOK){
freeEventNode(threadNode.evp);
return -1;
}
if(!threadNode.eventSent)
ellDelete(&pmsg->sendQueue, &threadNode.link);
pmsg->numberOfSendersWaiting--;
ellAdd(&pmsg->eventFreeList, &threadNode.evp->link);
if (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) {
epicsMutexUnlock(pmsg->mutex);
return -1;
@@ -227,7 +270,9 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait,
/*
* If there's a message on the queue, copy it
*/
epicsMutexLock(pmsg->mutex);
if(epicsMutexLock(pmsg->mutex)!=epicsMutexLockOK)
return -1;
myOutPtr = (char *)pmsg->outPtr;
if ((myOutPtr != pmsg->inPtr) || pmsg->full) {
int ret;
@@ -282,17 +327,31 @@ myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size, bool wait,
threadNode.buf = message;
threadNode.size = size;
threadNode.eventSent = false;
if(!threadNode.evp) {
epicsMutexUnlock(pmsg->mutex);
return -1;
}
ellAdd(&pmsg->receiveQueue, &threadNode.link);
epicsMutexUnlock(pmsg->mutex);
if(haveTimeout)
epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
else
epicsEventWait(threadNode.evp->event);
epicsMutexLock(pmsg->mutex);
if(epicsMutexLock(pmsg->mutex)!=epicsMutexLockOK){
freeEventNode(threadNode.evp);
return -1;
}
if(!threadNode.eventSent)
ellDelete(&pmsg->receiveQueue, &threadNode.link);
ellAdd(&pmsg->eventFreeList, &threadNode.evp->link);
epicsMutexUnlock(pmsg->mutex);
if(threadNode.eventSent && (threadNode.size <= size))
return threadNode.size;
return -1;
@@ -322,7 +381,7 @@ epicsMessageQueuePending(epicsMessageQueueId pmsg)
char *myInPtr, *myOutPtr;
int nmsg;
epicsMutexLock(pmsg->mutex);
epicsMutexMustLock(pmsg->mutex);
myInPtr = (char *)pmsg->inPtr;
myOutPtr = (char *)pmsg->outPtr;
if (pmsg->full)