Changed from epicsRingPointer to epicsMessageQueue for message passing, much better performance

This commit is contained in:
MarkRivers
2003-09-17 16:43:05 +00:00
parent 637f7d1bc4
commit d37d0eaa5b
3 changed files with 64 additions and 97 deletions
+1 -1
View File
@@ -3,7 +3,7 @@ TOP = ../..
include $(TOP)/configure/CONFIG
# The following are used for debugging messages.
#USR_CXXFLAGS += -DDEBUG
USR_CXXFLAGS += -DDEBUG
INC += gpibIO.h serialIO.h
+4 -4
View File
@@ -2,9 +2,9 @@
FILENAME... serialIO.h
USAGE... .
Version: $Revision: 1.4 $
Modified By: $Author: sluiter $
Last Modified: $Date: 2003-05-27 21:48:45 $
Version: $Revision: 1.5 $
Modified By: $Author: rivers $
Last Modified: $Date: 2003-09-17 16:43:05 $
*/
/*****************************************************************
@@ -41,7 +41,7 @@ public:
static void serialIOCallback(Message *, void *);
private:
MessageClient* pMessageClient;
epicsRingPointer<void *> *msgQId;
epicsMessageQueue *msgQId;
};
#else /* For C just define serialInfo as a dummy structure since it can't
understand the include files which define what it really is */
+59 -92
View File
@@ -2,9 +2,9 @@
FILENAME... serialIOMPF.cc
USAGE... Interface between MPF and motor record device drivers.
Version: $Revision: 1.10 $
Version: $Revision: 1.11 $
Modified By: $Author: rivers $
Last Modified: $Date: 2003-09-02 05:10:01 $
Last Modified: $Date: 2003-09-17 16:43:05 $
*/
/*
@@ -30,14 +30,14 @@ Last Modified: $Date: 2003-09-02 05:10:01 $
#include <string.h>
#include <epicsThread.h>
#include <epicsRingPointer.h>
#include <epicsMessageQueue.h>
#include "serialIO.h"
#include "ConnectMessage.h"
#include "Char8ArrayMessage.h"
#include "serialServer.h"
// Minimum wait for server reply; in 0.1 second increments.
#define MIN_MSGQ_WAIT (int) (1/0.1)
// Minimum wait for server reply;
#define MIN_MSGQ_WAIT 0.1
#ifdef __GNUG__
@@ -54,12 +54,12 @@ Last Modified: $Date: 2003-09-02 05:10:01 $
serialIO::serialIO(int card, char *serverName, bool *createdOK)
{
int status, itera;
int status;
Message *pmess;
*createdOK = 1;
// Create a message queue for the callback
msgQId = new epicsRingPointer<void *>(4);
// Create a message queue for the callback. Size = 4 messages, each message is a pointer to an MPF message.
msgQId = new epicsMessageQueue(4, sizeof(Message *));
Debug(5, "serialIOInit: message queue created, ID=%p\n", msgQId);
pMessageClient = new MessageClient(serialIOCallback,(void *)this);
Debug(5, "serialIOInit: message client created=%p\n", pMessageClient);
@@ -72,22 +72,16 @@ serialIO::serialIO(int card, char *serverName, bool *createdOK)
Debug(1, "serialIOInit: Bound to MPF server %s\n", serverName);
// Wait for connect message to be received, 2 second timeout
for (itera = 0; msgQId->isEmpty() == true; itera++)
{
epicsThreadSleep(0.1);
if (itera >= MIN_MSGQ_WAIT)
{
epicsPrintf("serialIO: error calling msgQReceive, status = %d\n", status);
*createdOK = 0;
return;
}
status = msgQId->receive((char *) &pmess, sizeof(pmess), 2.0);
if (status < 0) {
epicsPrintf("serialIO: error calling msgQ->receive, status = %d\n",
status);
*createdOK = 0;
return;
}
pmess = (Message *) msgQId->pop();
if (pmess->getType() != messageTypeConnect)
{
epicsPrintf("serialIO: incorrect message type received = %d\n", pmess->getType());
if (pmess->getType() != messageTypeConnect) {
epicsPrintf("serialIO: incorrect message type received = %d\n",
pmess->getType());
*createdOK = 0;
}
}
@@ -98,7 +92,7 @@ int serialIO::serialIOSend(char const *buffer, int buffer_len, int timeout)
Char8ArrayMessage *psm = new Char8ArrayMessage;
Char8ArrayMessage *prm = NULL;
Message *pmess;
int itera, timeout_itera;
double wait;
psm->allocValue(buffer_len);
psm->setSize(buffer_len);
@@ -113,36 +107,24 @@ int serialIO::serialIOSend(char const *buffer, int buffer_len, int timeout)
}
Debug(2, "serialIOSend: sent message %s\n", buffer);
/* Wait for 2x the timeout or MIN_MSGQ_WAIT; which ever is greater. */
timeout_itera = psm->timeout * 20;
if (timeout_itera < MIN_MSGQ_WAIT)
timeout_itera = MIN_MSGQ_WAIT;
// Wait for response back from server
for (itera = 0; msgQId->isEmpty() == true; itera++)
{
if (itera > timeout_itera)
{
epicsPrintf("serialIOSend: error calling msgQReceive=%d\n", status);
goto finish;
}
epicsThreadSleep(0.1);
wait = 2*timeout/1000.;
if (wait < MIN_MSGQ_WAIT) wait = MIN_MSGQ_WAIT;
status = msgQId->receive((char *)&pmess, sizeof(pmess), wait);
if (status < 0) {
epicsPrintf("serialIOSend: error calling msgQ->receive=%d\n", status);
goto finish;
}
pmess = (Message *) msgQId->pop();
Debug(5, "serialIOSend: got message, pmess=%p\n", pmess);
if (pmess->getType() == messageTypeChar8Array)
{
prm = (Char8ArrayMessage *)pmess;
status = prm->status;
if (status)
Debug(1, "serialIOSend: error receiving message, status=%d\n", status);
Debug(4, "serialIOSend: received message, status=%d\n", status);
}
else
{
epicsPrintf("serialIOInit: incorrect message type received = %d\n",
pmess->getType());
if (pmess->getType() == messageTypeChar8Array) {
prm = (Char8ArrayMessage *)pmess;
status = prm->status;
if (status) Debug(1, "serialIOSend: error receiving message, status=%d\n",
status);
Debug(4, "serialIOSend: received message, status=%d\n", status);
} else {
epicsPrintf("serialIOInit: incorrect message type received = %d\n",
pmess->getType());
}
delete prm;
@@ -159,7 +141,7 @@ int serialIO::serialIORecv(char *buffer, int buffer_len, char *terminator,
Char8ArrayMessage *psm = new Char8ArrayMessage;
Char8ArrayMessage *prm = NULL;
Message *pmess;
int itera, timeout_itera;
double wait;
psm->timeout = timeout/1000;
// MPF uses seconds, not milliseconds for timeout. If the desired timeout
@@ -182,23 +164,15 @@ int serialIO::serialIORecv(char *buffer, int buffer_len, char *terminator,
}
Debug(5, "serialIORecv: sent message status = %d, timeout=%d\n", status, timeout);
/* Wait for 2x the timeout or MIN_MSGQ_WAIT; which ever is greater. */
timeout_itera = psm->timeout * 20;
if (timeout_itera < MIN_MSGQ_WAIT)
timeout_itera = MIN_MSGQ_WAIT;
for (itera = 0; msgQId->isEmpty() == true; itera++)
{
if (itera > timeout_itera)
{
epicsPrintf("serialIORecv: error calling msgQReceive, status = %d\n", status);
goto done;
}
epicsThreadSleep(0.1);
wait = 2*timeout/1000.;
if (wait < MIN_MSGQ_WAIT) wait = MIN_MSGQ_WAIT;
status = msgQId->receive((char *)&pmess, sizeof(pmess), wait);
if (status < 0) {
epicsPrintf("serialIORecv: error calling msgQ->receive, status = %d\n",
status);
goto done;
}
pmess = (Message *) msgQId->pop();
if (pmess->getType() != messageTypeChar8Array) {
epicsPrintf("serialIOInit: incorrect message type received = %d\n",
pmess->getType());
@@ -212,11 +186,10 @@ int serialIO::serialIORecv(char *buffer, int buffer_len, char *terminator,
nrec = prm->getSize();
if (nrec > buffer_len) nrec=buffer_len;
memcpy(buffer, prm->value, nrec);
Debug(2,"serialIORecv: Received %d bytes\n", nrec);
// Append a NULL byte to the response if there is room
if (nrec < buffer_len) buffer[nrec] = '\0';
Debug(2,"serialIORecv: Received %d bytes, message = \n%s\n", nrec, buffer);
Debug(2,"serialIORecv: received %d bytes: \n%s\n", nrec, buffer);
cleanup:
delete prm;
@@ -232,7 +205,7 @@ int serialIO::serialIOSendRecv(char const *outbuff, int outbuff_len,
Char8ArrayMessage *psm = new Char8ArrayMessage;
Char8ArrayMessage *prm = NULL;
Message *pmess;
int itera, timeout_itera;
double wait;
psm->allocValue(outbuff_len);
psm->setSize(outbuff_len);
@@ -260,24 +233,15 @@ int serialIO::serialIOSendRecv(char const *outbuff, int outbuff_len,
Debug(2, "serialIOSendRecv: sent: %s\n", outbuff);
/* Wait for 2x the timeout or MIN_MSGQ_WAIT; which ever is greater. */
timeout_itera = psm->timeout * 20;
if (timeout_itera < MIN_MSGQ_WAIT)
timeout_itera = MIN_MSGQ_WAIT;
for (itera = 0; msgQId->isEmpty() == true; itera++)
{
if (itera > timeout_itera)
{
epicsPrintf("serialIOSendRecv: error calling msgQReceive, status = %d\n", status);
goto done;
}
epicsThreadSleep(0.1);
}
pmess = (Message *) msgQId->pop();
// Wait for response back from server
wait = 2*timeout/1000.;
if (wait < MIN_MSGQ_WAIT) wait = MIN_MSGQ_WAIT;
status = msgQId->receive((char *)&pmess, sizeof(pmess), wait);
if (status < 0) {
epicsPrintf("serialIOSendRecv: error calling msgQ->receive, status = %d\n",
status);
goto done;
}
if (pmess->getType() != messageTypeChar8Array) {
epicsPrintf("serialIOSendRecv: incorrect message type received = %d\n",
pmess->getType());
@@ -302,10 +266,11 @@ done:
return (nrec);
}
void serialIO::serialIOCallback(Message *message, void *pointer)
{
serialIO *psi = (serialIO *)pointer;
bool rtnval;
int status;
// If this is a Connect message or a Char8ArrayMessage then send it to
// the message queue.
@@ -324,9 +289,11 @@ void serialIO::serialIOCallback(Message *message, void *pointer)
return;
}
rtnval = psi->msgQId->push((void **) message);
if (rtnval == false)
epicsPrintf("serialIOCallback: error from msgQId->push\n");
status = psi->msgQId->send((char *)&message, sizeof(message));
if (status < 0) {
epicsPrintf("serialIOCallback: error from Send, status = %d\n",
status);
}
}
extern "C"