do record processing in callback tread instead of port thread

This commit is contained in:
zimoch
2008-08-04 15:46:39 +00:00
parent 706c59d069
commit 80477709aa
2 changed files with 71 additions and 31 deletions

View File

@ -765,7 +765,7 @@ readHandler()
{ {
// In AsyncRead mode just poll // In AsyncRead mode just poll
// and read as much as possible // and read as much as possible
pasynUser->timeout = readTimeout; pasynUser->timeout = 0.0;
bytesToRead = buffersize; bytesToRead = buffersize;
} }
else else
@ -786,7 +786,7 @@ readHandler()
status = pasynOctet->read(pvtOctet, pasynUser, status = pasynOctet->read(pvtOctet, pasynUser,
buffer, bytesToRead, (size_t*)&received, &eomReason); buffer, bytesToRead, (size_t*)&received, &eomReason);
if (ioAction != AsyncRead || status != asynTimeout) if (ioAction == Read || status != asynTimeout)
{ {
debug("AsynDriverInterface::readHandler(%s): " debug("AsynDriverInterface::readHandler(%s): "
"read(..., bytesToRead=%d, ...) [timeout=%f seconds] = %s\n", "read(..., bytesToRead=%d, ...) [timeout=%f seconds] = %s\n",
@ -798,7 +798,7 @@ readHandler()
switch (status) switch (status)
{ {
case asynSuccess: case asynSuccess:
if (ioAction == AsyncRead) if (ioAction != Read)
{ {
#ifndef NO_TEMPORARY #ifndef NO_TEMPORARY
debug("AsynDriverInterface::readHandler(%s): " debug("AsynDriverInterface::readHandler(%s): "
@ -950,6 +950,8 @@ void intrCallbackOctet(void* /*pvt*/, asynUser *pasynUser,
// internal buffer of asynDriver. // internal buffer of asynDriver.
if (!interruptAccept) return; // too early to process records if (!interruptAccept) return; // too early to process records
debug("AsynDriverInterface::intrCallbackOctet(%s) ioAction = %s\n",
interface->clientName(), ioActionStr[interface->ioAction]);
if (interface->ioAction == AsyncRead || if (interface->ioAction == AsyncRead ||
interface->ioAction == AsyncReadMore) interface->ioAction == AsyncReadMore)
{ {
@ -1287,8 +1289,8 @@ void handleRequest(asynUser* pasynUser)
{ {
AsynDriverInterface* interface = AsynDriverInterface* interface =
static_cast<AsynDriverInterface*>(pasynUser->userPvt); static_cast<AsynDriverInterface*>(pasynUser->userPvt);
debug("AsynDriverInterface::handleRequest(%s)\n", debug("AsynDriverInterface::handleRequest(%s) %s\n",
interface->clientName()); interface->clientName(), ioActionStr[interface->ioAction]);
switch (interface->ioAction) switch (interface->ioAction)
{ {
case None: case None:

View File

@ -41,6 +41,7 @@ extern "C" {
#include <semLib.h> #include <semLib.h>
#include <wdLib.h> #include <wdLib.h>
#include <taskLib.h>
extern DBBASE *pdbbase; extern DBBASE *pdbbase;
@ -52,6 +53,7 @@ extern DBBASE *pdbbase;
#include <epicsMutex.h> #include <epicsMutex.h>
#include <epicsEvent.h> #include <epicsEvent.h>
#include <epicsTime.h> #include <epicsTime.h>
#include <epicsThread.h>
#include <registryFunction.h> #include <registryFunction.h>
#include <iocsh.h> #include <iocsh.h>
@ -76,10 +78,12 @@ epicsShareFunc int epicsShareAPI iocshCmd(const char *command);
enum MoreFlags { enum MoreFlags {
// 0x00FFFFFF used by StreamCore // 0x00FFFFFF used by StreamCore
InDestructor = 0x0100000, InDestructor = 0x0100000,
ValueReceived = 0x0200000 ValueReceived = 0x0200000,
Aborted = 0x0400000
}; };
extern "C" void streamExecuteCommand(CALLBACK *pcallback); extern "C" void streamExecuteCommand(CALLBACK *pcallback);
extern "C" void streamRecordProcessCallback(CALLBACK *pcallback);
extern "C" long streamReload(char* recordname); extern "C" long streamReload(char* recordname);
class Stream : protected StreamCore class Stream : protected StreamCore
@ -108,6 +112,7 @@ class Stream : protected StreamCore
long currentValueLength; long currentValueLength;
IOSCANPVT ioscanpvt; IOSCANPVT ioscanpvt;
CALLBACK commandCallback; CALLBACK commandCallback;
CALLBACK processCallback;
#ifdef EPICS_3_14 #ifdef EPICS_3_14
@ -118,7 +123,7 @@ class Stream : protected StreamCore
#endif #endif
// StreamCore methods // StreamCore methods
// void protocolStartHook(); // Nothing to do here? void protocolStartHook();
void protocolFinishHook(ProtocolResult); void protocolFinishHook(ProtocolResult);
void startTimer(unsigned long timeout); void startTimer(unsigned long timeout);
bool getFieldAddress(const char* fieldname, bool getFieldAddress(const char* fieldname,
@ -131,6 +136,7 @@ class Stream : protected StreamCore
void releaseMutex(); void releaseMutex();
bool execute(); bool execute();
friend void streamExecuteCommand(CALLBACK *pcallback); friend void streamExecuteCommand(CALLBACK *pcallback);
friend void streamRecordProcessCallback(CALLBACK *pcallback);
// Stream Epics methods // Stream Epics methods
long initRecord(); long initRecord();
@ -280,12 +286,15 @@ epicsExportAddress(drvet, stream);
void streamEpicsPrintTimestamp(char* buffer, int size) void streamEpicsPrintTimestamp(char* buffer, int size)
{ {
int tlen;
epicsTime tm = epicsTime::getCurrent(); epicsTime tm = epicsTime::getCurrent();
tm.strftime(buffer, size, "%Y/%m/%d %H:%M:%S.%03f"); tlen = tm.strftime(buffer, size, "%Y/%m/%d %H:%M:%S.%03f");
sprintf(buffer+tlen, " %.*s", size-tlen-2, epicsThreadGetNameSelf());
} }
#else #else
void streamEpicsPrintTimestamp(char* buffer, int size) void streamEpicsPrintTimestamp(char* buffer, int size)
{ {
int tlen;
char* c; char* c;
TS_STAMP tm; TS_STAMP tm;
tsLocalTime (&tm); tsLocalTime (&tm);
@ -294,6 +303,8 @@ void streamEpicsPrintTimestamp(char* buffer, int size)
if (c) { if (c) {
c[4] = 0; c[4] = 0;
} }
tlen = strlen(buffer);
sprintf(buffer+tlen, " %.*s", size-tlen-2, taskName(0));
} }
#endif #endif
@ -310,7 +321,7 @@ report(int interest)
printf(" %s\n", interface.name()); printf(" %s\n", interface.name());
++interface; ++interface;
} }
if (interest < 1) return OK; if (interest < 1) return OK;
printf(" registered converters:\n"); printf(" registered converters:\n");
StreamFormatConverter* converter; StreamFormatConverter* converter;
@ -323,7 +334,7 @@ report(int interest)
printf(" %%%c %s\n", c, converter->name()); printf(" %%%c %s\n", c, converter->name());
} }
} }
Stream* pstream; Stream* pstream;
printf(" connected records:\n"); printf(" connected records:\n");
for (pstream = static_cast<Stream*>(first); pstream; for (pstream = static_cast<Stream*>(first); pstream;
@ -525,6 +536,8 @@ Stream(dbCommon* _record, struct link *ioLink,
#endif #endif
callbackSetCallback(streamExecuteCommand, &commandCallback); callbackSetCallback(streamExecuteCommand, &commandCallback);
callbackSetUser(this, &commandCallback); callbackSetUser(this, &commandCallback);
callbackSetCallback(streamRecordProcessCallback, &processCallback);
callbackSetUser(this, &processCallback);
status = ERROR; status = ERROR;
convert = DO_NOT_CONVERT; convert = DO_NOT_CONVERT;
ioscanpvt = NULL; ioscanpvt = NULL;
@ -561,14 +574,14 @@ initRecord()
// scan link parameters: filename protocol busname addr busparam // scan link parameters: filename protocol busname addr busparam
// It is safe to call this function again with different // It is safe to call this function again with different
// link text or different protocol file. // link text or different protocol file.
char filename[80]; char filename[80];
char protocol[80]; char protocol[80];
char busname[80]; char busname[80];
int addr = -1; int addr = -1;
char busparam[80]; char busparam[80];
int n; int n;
if (ioLink->type != INST_IO) if (ioLink->type != INST_IO)
{ {
error("%s: Wrong link type %s\n", name(), error("%s: Wrong link type %s\n", name(),
@ -790,6 +803,12 @@ expire(CALLBACK *pcallback)
// StreamCore virtual methods //////////////////////////////////////////// // StreamCore virtual methods ////////////////////////////////////////////
void Stream::
protocolStartHook()
{
flags &= ~Aborted;
}
void Stream:: void Stream::
protocolFinishHook(ProtocolResult result) protocolFinishHook(ProtocolResult result)
{ {
@ -824,6 +843,7 @@ protocolFinishHook(ProtocolResult result)
status = CALC_ALARM; status = CALC_ALARM;
break; break;
case Abort: case Abort:
flags |= Aborted;
case Fault: case Fault:
status = UDF_ALARM; status = UDF_ALARM;
if (record->pact || record->scan == SCAN_IO_EVENT) if (record->pact || record->scan == SCAN_IO_EVENT)
@ -845,28 +865,46 @@ protocolFinishHook(ProtocolResult result)
#endif #endif
return; return;
} }
if (record->pact || record->scan == SCAN_IO_EVENT)
{
debug("Stream::protocolFinishHook(stream=%s,result=%d) "
"processing record\n", name(), result);
// process record
// This will call streamReadWrite.
dbScanLock(record);
((DEVSUPFUN)record->rset->process)(record);
dbScanUnlock(record);
debug("Stream::protocolFinishHook(stream=%s,result=%d) done\n",
name(), result);
}
if (result != Abort && record->scan == SCAN_IO_EVENT) if (result != Abort && record->scan == SCAN_IO_EVENT)
{
// re-enable early input
flags |= AcceptInput;
}
if (record->pact || record->scan == SCAN_IO_EVENT)
{
// process record in callback thread to break possible recursion
callbackSetPriority(priority(), &processCallback);
callbackRequest(&processCallback);
}
}
void streamRecordProcessCallback(CALLBACK *pcallback)
{
Stream* pstream = static_cast<Stream*>(pcallback->user);
dbCommon* record = pstream->record;
// process record
// This will call streamReadWrite.
debug("streamRecordProcessCallback(%s) processing record\n",
pstream->name());
dbScanLock(record);
((DEVSUPFUN)record->rset->process)(record);
dbScanUnlock(record);
debug("streamRecordProcessCallback(%s) processing record done\n",
pstream->name());
if (record->scan == SCAN_IO_EVENT && !(pstream->flags & Aborted))
{ {
// restart protocol for next turn // restart protocol for next turn
debug("Stream::process(%s) restart async protocol\n", debug("streamRecordProcessCallback(%s) restart async protocol\n",
name()); pstream->name());
if (!startProtocol(StartAsync)) if (!pstream->startProtocol(Stream::StartAsync))
{ {
error("%s: Can't restart \"I/O Intr\" protocol\n", error("%s: Can't restart \"I/O Intr\" protocol\n",
name()); pstream->name());
} }
} }
} }
@ -1003,7 +1041,7 @@ matchValue(const StreamFormat& format, const void* fieldaddress)
char* buffer; char* buffer;
int status; int status;
const char* putfunc; const char* putfunc;
if (fieldaddress) if (fieldaddress)
{ {
// Format like "%([record.]field)..." has requested to put value // Format like "%([record.]field)..." has requested to put value
@ -1161,7 +1199,7 @@ noMoreElements:
void streamExecuteCommand(CALLBACK *pcallback) void streamExecuteCommand(CALLBACK *pcallback)
{ {
Stream* pstream = static_cast<Stream*>(pcallback->user); Stream* pstream = static_cast<Stream*>(pcallback->user);
if (iocshCmd(pstream->outputLine()) != OK) if (iocshCmd(pstream->outputLine()) != OK)
{ {
pstream->execCallback(StreamIoFault); pstream->execCallback(StreamIoFault);
@ -1178,7 +1216,7 @@ extern "C" int execute(const char *cmd);
void streamExecuteCommand(CALLBACK *pcallback) void streamExecuteCommand(CALLBACK *pcallback)
{ {
Stream* pstream = static_cast<Stream*>(pcallback->user); Stream* pstream = static_cast<Stream*>(pcallback->user);
if (execute(pstream->outputLine()) != OK) if (execute(pstream->outputLine()) != OK)
{ {
pstream->execCallback(StreamIoFault); pstream->execCallback(StreamIoFault);