diff --git a/src/AsynDriverInterface.cc b/src/AsynDriverInterface.cc index 9af8ece..2783bdc 100644 --- a/src/AsynDriverInterface.cc +++ b/src/AsynDriverInterface.cc @@ -765,7 +765,7 @@ readHandler() { // In AsyncRead mode just poll // and read as much as possible - pasynUser->timeout = readTimeout; + pasynUser->timeout = 0.0; bytesToRead = buffersize; } else @@ -786,7 +786,7 @@ readHandler() status = pasynOctet->read(pvtOctet, pasynUser, buffer, bytesToRead, (size_t*)&received, &eomReason); - if (ioAction != AsyncRead || status != asynTimeout) + if (ioAction == Read || status != asynTimeout) { debug("AsynDriverInterface::readHandler(%s): " "read(..., bytesToRead=%d, ...) [timeout=%f seconds] = %s\n", @@ -798,7 +798,7 @@ readHandler() switch (status) { case asynSuccess: - if (ioAction == AsyncRead) + if (ioAction != Read) { #ifndef NO_TEMPORARY debug("AsynDriverInterface::readHandler(%s): " @@ -950,6 +950,8 @@ void intrCallbackOctet(void* /*pvt*/, asynUser *pasynUser, // internal buffer of asynDriver. if (!interruptAccept) return; // too early to process records + debug("AsynDriverInterface::intrCallbackOctet(%s) ioAction = %s\n", + interface->clientName(), ioActionStr[interface->ioAction]); if (interface->ioAction == AsyncRead || interface->ioAction == AsyncReadMore) { @@ -1287,8 +1289,8 @@ void handleRequest(asynUser* pasynUser) { AsynDriverInterface* interface = static_cast(pasynUser->userPvt); - debug("AsynDriverInterface::handleRequest(%s)\n", - interface->clientName()); + debug("AsynDriverInterface::handleRequest(%s) %s\n", + interface->clientName(), ioActionStr[interface->ioAction]); switch (interface->ioAction) { case None: diff --git a/src/StreamEpics.cc b/src/StreamEpics.cc index 67bef3d..54a21d5 100644 --- a/src/StreamEpics.cc +++ b/src/StreamEpics.cc @@ -41,6 +41,7 @@ extern "C" { #include #include +#include extern DBBASE *pdbbase; @@ -52,6 +53,7 @@ extern DBBASE *pdbbase; #include #include #include +#include #include #include @@ -76,10 +78,12 @@ epicsShareFunc int epicsShareAPI iocshCmd(const char *command); enum MoreFlags { // 0x00FFFFFF used by StreamCore InDestructor = 0x0100000, - ValueReceived = 0x0200000 + ValueReceived = 0x0200000, + Aborted = 0x0400000 }; extern "C" void streamExecuteCommand(CALLBACK *pcallback); +extern "C" void streamRecordProcessCallback(CALLBACK *pcallback); extern "C" long streamReload(char* recordname); class Stream : protected StreamCore @@ -108,6 +112,7 @@ class Stream : protected StreamCore long currentValueLength; IOSCANPVT ioscanpvt; CALLBACK commandCallback; + CALLBACK processCallback; #ifdef EPICS_3_14 @@ -118,7 +123,7 @@ class Stream : protected StreamCore #endif // StreamCore methods - // void protocolStartHook(); // Nothing to do here? + void protocolStartHook(); void protocolFinishHook(ProtocolResult); void startTimer(unsigned long timeout); bool getFieldAddress(const char* fieldname, @@ -131,6 +136,7 @@ class Stream : protected StreamCore void releaseMutex(); bool execute(); friend void streamExecuteCommand(CALLBACK *pcallback); + friend void streamRecordProcessCallback(CALLBACK *pcallback); // Stream Epics methods long initRecord(); @@ -280,12 +286,15 @@ epicsExportAddress(drvet, stream); void streamEpicsPrintTimestamp(char* buffer, int size) { + int tlen; epicsTime tm = epicsTime::getCurrent(); - tm.strftime(buffer, size, "%Y/%m/%d %H:%M:%S.%03f"); + tlen = tm.strftime(buffer, size, "%Y/%m/%d %H:%M:%S.%03f"); + sprintf(buffer+tlen, " %.*s", size-tlen-2, epicsThreadGetNameSelf()); } #else void streamEpicsPrintTimestamp(char* buffer, int size) { + int tlen; char* c; TS_STAMP tm; tsLocalTime (&tm); @@ -294,6 +303,8 @@ void streamEpicsPrintTimestamp(char* buffer, int size) if (c) { c[4] = 0; } + tlen = strlen(buffer); + sprintf(buffer+tlen, " %.*s", size-tlen-2, taskName(0)); } #endif @@ -310,7 +321,7 @@ report(int interest) printf(" %s\n", interface.name()); ++interface; } - + if (interest < 1) return OK; printf(" registered converters:\n"); StreamFormatConverter* converter; @@ -323,7 +334,7 @@ report(int interest) printf(" %%%c %s\n", c, converter->name()); } } - + Stream* pstream; printf(" connected records:\n"); for (pstream = static_cast(first); pstream; @@ -525,6 +536,8 @@ Stream(dbCommon* _record, struct link *ioLink, #endif callbackSetCallback(streamExecuteCommand, &commandCallback); callbackSetUser(this, &commandCallback); + callbackSetCallback(streamRecordProcessCallback, &processCallback); + callbackSetUser(this, &processCallback); status = ERROR; convert = DO_NOT_CONVERT; ioscanpvt = NULL; @@ -561,14 +574,14 @@ initRecord() // scan link parameters: filename protocol busname addr busparam // It is safe to call this function again with different // link text or different protocol file. - + char filename[80]; char protocol[80]; char busname[80]; int addr = -1; char busparam[80]; int n; - + if (ioLink->type != INST_IO) { error("%s: Wrong link type %s\n", name(), @@ -790,6 +803,12 @@ expire(CALLBACK *pcallback) // StreamCore virtual methods //////////////////////////////////////////// +void Stream:: +protocolStartHook() +{ + flags &= ~Aborted; +} + void Stream:: protocolFinishHook(ProtocolResult result) { @@ -824,6 +843,7 @@ protocolFinishHook(ProtocolResult result) status = CALC_ALARM; break; case Abort: + flags |= Aborted; case Fault: status = UDF_ALARM; if (record->pact || record->scan == SCAN_IO_EVENT) @@ -845,28 +865,46 @@ protocolFinishHook(ProtocolResult result) #endif return; } - if (record->pact || record->scan == SCAN_IO_EVENT) - { - debug("Stream::protocolFinishHook(stream=%s,result=%d) " - "processing record\n", name(), result); - // process record - // This will call streamReadWrite. - dbScanLock(record); - ((DEVSUPFUN)record->rset->process)(record); - dbScanUnlock(record); - debug("Stream::protocolFinishHook(stream=%s,result=%d) done\n", - name(), result); - } if (result != Abort && record->scan == SCAN_IO_EVENT) + { + // re-enable early input + flags |= AcceptInput; + } + + if (record->pact || record->scan == SCAN_IO_EVENT) + { + // process record in callback thread to break possible recursion + callbackSetPriority(priority(), &processCallback); + callbackRequest(&processCallback); + } + +} + +void streamRecordProcessCallback(CALLBACK *pcallback) +{ + Stream* pstream = static_cast(pcallback->user); + dbCommon* record = pstream->record; + + // process record + // This will call streamReadWrite. + debug("streamRecordProcessCallback(%s) processing record\n", + pstream->name()); + dbScanLock(record); + ((DEVSUPFUN)record->rset->process)(record); + dbScanUnlock(record); + debug("streamRecordProcessCallback(%s) processing record done\n", + pstream->name()); + + if (record->scan == SCAN_IO_EVENT && !(pstream->flags & Aborted)) { // restart protocol for next turn - debug("Stream::process(%s) restart async protocol\n", - name()); - if (!startProtocol(StartAsync)) + debug("streamRecordProcessCallback(%s) restart async protocol\n", + pstream->name()); + if (!pstream->startProtocol(Stream::StartAsync)) { error("%s: Can't restart \"I/O Intr\" protocol\n", - name()); + pstream->name()); } } } @@ -1003,7 +1041,7 @@ matchValue(const StreamFormat& format, const void* fieldaddress) char* buffer; int status; const char* putfunc; - + if (fieldaddress) { // Format like "%([record.]field)..." has requested to put value @@ -1161,7 +1199,7 @@ noMoreElements: void streamExecuteCommand(CALLBACK *pcallback) { Stream* pstream = static_cast(pcallback->user); - + if (iocshCmd(pstream->outputLine()) != OK) { pstream->execCallback(StreamIoFault); @@ -1178,7 +1216,7 @@ extern "C" int execute(const char *cmd); void streamExecuteCommand(CALLBACK *pcallback) { Stream* pstream = static_cast(pcallback->user); - + if (execute(pstream->outputLine()) != OK) { pstream->execCallback(StreamIoFault);