diff --git a/src/StreamCore.cc b/src/StreamCore.cc index ea19cec..d598a2e 100644 --- a/src/StreamCore.cc +++ b/src/StreamCore.cc @@ -390,6 +390,22 @@ compileCommand(StreamProtocolParser::Protocol* protocol, // Run the protocol +// Input and events may come asynchonously from the bus driver +// Especially in the sequence 'out "request"; in "reply";' the +// reply can come before the 'in' command has actually started. +// For asyncronous protocols, input can come at any time. +// Thus, we must always accept input and event while the protocol +// is running or when asyncronous mode is active. +// Early input and event must be buffered until 'in' or 'event' +// start. An 'out' command must discard any early input to avoid +// problems with late input from aborted protocols. +// Async mode ends on Abort or Error or if another command comes +// after the asynchronous 'in' command. +// Input can be discarded when it is not accepted any more, i.e. +// at the end of syncronous protocols and when an asynchronous +// mode ends (but not when 'in'-only protocol finishes normally). + + bool StreamCore:: startProtocol(StartMode startMode) { @@ -443,7 +459,7 @@ finishProtocol(ProtocolResult status) flags & WaitPending ? "timerCallback()" : ""); status = Fault; } - flags &= ~(AcceptInput|AcceptEvent); +//// flags &= ~(AcceptInput|AcceptEvent); if (runningHandler) { // get original error status @@ -518,6 +534,7 @@ finishProtocol(ProtocolResult status) flags &= ~BusOwner; } busFinish(); + flags &= ~(AcceptInput|AcceptEvent); protocolFinishHook(status); } @@ -539,16 +556,16 @@ evalCommand() switch (*commandIndex++) { case out_cmd: - flags &= ~(AcceptInput|AcceptEvent); + //// flags &= ~(AcceptInput|AcceptEvent); return evalOut(); case in_cmd: - flags &= ~AcceptEvent; + //// flags &= ~AcceptEvent; return evalIn(); case wait_cmd: - flags &= ~(AcceptInput|AcceptEvent); + //// flags &= ~(AcceptInput|AcceptEvent); return evalWait(); case event_cmd: - flags &= ~AcceptInput; + //// flags &= ~AcceptInput; return evalEvent(); case exec_cmd: return evalExec(); @@ -787,6 +804,8 @@ lockCallback(StreamIoStatus status) flags |= BusOwner; if (status != StreamIoSuccess) { + error("%s: Lock timeout\n", + name()); finishProtocol(LockTimeout); return; } @@ -907,7 +926,7 @@ readCallback(StreamIoStatus status, #endif return 0; } - flags &= ~AcceptInput; +//// flags &= ~AcceptInput; unparsedInput = false; switch (status) { @@ -966,9 +985,15 @@ readCallback(StreamIoStatus status, { // look for terminator end = inputBuffer.find(inTerminator); - if (end >= 0) termlen = inTerminator.length(); - debug("StreamCore::readCallback(%s) inTerminator %sfound\n", - name(), end >= 0 ? "" : "not "); + if (end >= 0) + { + termlen = inTerminator.length(); + debug("StreamCore::readCallback(%s) inTerminator %s at position %ld\n", + name(), inTerminator.expand()(), end); + } else { + debug("StreamCore::readCallback(%s) inTerminator %s not found\n", + name(), inTerminator.expand()()); + } } if (status == StreamIoEnd && end < 0) { @@ -990,6 +1015,12 @@ readCallback(StreamIoStatus status, end = maxInput; termlen = 0; } + if (end >= 0) + { + // be forgiving with timeout because end is found + if (status == StreamIoTimeout) + status = StreamIoEnd; + } if (end < 0) { // no end found @@ -1006,23 +1037,22 @@ readCallback(StreamIoStatus status, } // try to parse what we got end = inputBuffer.length(); - if (!(flags & AsyncMode)) + if (flags & AsyncMode) + { + debug("StreamCore::readCallback(%s) async timeout: just restart\n", + name()); + inputBuffer.clear(); + commandIndex = commandStart; + evalIn(); + return 0; + } + else { error("%s: Timeout after reading %ld byte%s \"%s%s\"\n", name(), end, end==1 ? "" : "s", end > 20 ? "..." : "", inputBuffer.expand(-20)()); } } - - if (status == StreamIoTimeout && (flags & AsyncMode)) - { - debug("StreamCore::readCallback(%s) async timeout: just restart\n", - name()); - inputBuffer.clear(); - commandIndex = commandStart; - evalIn(); - return 0; - } inputLine.set(inputBuffer(), end); debug("StreamCore::readCallback(%s) input line: \"%s\"\n", @@ -1063,7 +1093,7 @@ readCallback(StreamIoStatus status, return 0; } // end input mode and do next command - flags &= ~(AsyncMode|AcceptInput); + //// flags &= ~(AsyncMode|AcceptInput); // -- should we tell someone that input has finished? -- evalCommand(); return 0;