diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index cb9a24a..9704262 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -511,6 +511,8 @@ void asynStreamGeneratorDriver::receiveUDP() { const std::size_t minDataBufferHeaderLength = 42; const std::size_t minCmdBufferHeaderLength = 20; + // For some reason the data buffers also have bit 15 set.... + // so they also appear as command buffers if (received >= minDataBufferHeaderLength && received == bufferLength * 2) { @@ -771,19 +773,21 @@ void asynStreamGeneratorDriver::processEvents() { epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent)); + std::sort(newStartPtr, newStartPtr + queuedEvents, oldestEventsFirst); + // std::size_t toProcess = // eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; std::size_t toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 10 / 20; - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "newStartPtr %" PRIu64 " queuedEvents %" PRIu64 " toProcess %" PRIu64 "\n", - newStartPtr - eventsA, queuedEvents, toProcess); + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "newStartPtr %" PRIu64 " queuedEvents %" PRIu64 " toProcess %" PRIu64 "\n", + // newStartPtr - eventsA, queuedEvents, toProcess); - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "eventsBLastStart %" PRIu64 " eventsBLastEnd %" PRIu64"\n", - eventsBLastStart - eventsB, eventsBLastEnd - eventsB); + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "eventsBLastStart %" PRIu64 " eventsBLastEnd %" PRIu64"\n", + // eventsBLastStart - eventsB, eventsBLastEnd - eventsB); // TODO could also consider an in-place merge eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents, @@ -816,9 +820,9 @@ void asynStreamGeneratorDriver::processEvents() { driverName, functionName, countPreset, timePreset, presetChannel, startTimestamp); - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: starting count: %" PRIu64 " %" PRIu64 " %" PRIu64 "\n", - driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp); + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: starting count: %" PRIu64 " %" PRIu64 " %" PRIu64 "\n", + // driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp); } //if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp) @@ -826,24 +830,24 @@ void asynStreamGeneratorDriver::processEvents() { // "%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n", // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); - if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) { - // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - // "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n", - // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); - - - - for (size_t j = 1; j < toProcess; ++j) { - if (eventsA[j-1].timestamp > eventsA[j].timestamp) { - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: not sorted %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 "\n", - driverName, functionName, b_cnt, j, eventsA[j-1].timestamp, eventsA[j].timestamp); - } - } - ++b_cnt; + // if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) { + // // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // // "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n", + // // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); + // + // + // + // for (size_t j = 1; j < toProcess; ++j) { + // if (eventsA[j-1].timestamp > eventsA[j].timestamp) { + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: not sorted %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 "\n", + // driverName, functionName, b_cnt, j, eventsA[j-1].timestamp, eventsA[j].timestamp); + // } + // } + // ++b_cnt; - std::sort(eventsA, eventsA + toProcess, oldestEventsFirst); - } + // std::sort(eventsA, eventsA + toProcess, oldestEventsFirst); + // } // if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, @@ -898,7 +902,8 @@ void asynStreamGeneratorDriver::processEvents() { (timePreset && elapsedSeconds >= (double)timePreset)) { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: finished count\n", driverName, functionName); - setIntegerParam(this->P_Status, STATUS_IDLE); + currStatus = STATUS_IDLE; + setIntegerParam(this->P_Status, currStatus); setIntegerParam(this->P_CountPreset, 0); setIntegerParam(this->P_TimePreset, 0); }