diff --git a/db/correlation_unit.db b/db/correlation_unit.db index 771ae09..94499e1 100644 --- a/db/correlation_unit.db +++ b/db/correlation_unit.db @@ -27,3 +27,47 @@ record(bi, "$(INSTR)$(NAME):Enable_RBV") field(ONAM, "ON") field(SCAN, ".5 second") } + +record(longin,"$(INSTR)$(NAME):UDP_DROPPED") +{ + field(DESC, "UDP Packets Missed") + field(EGU, "Events") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} + +record(longin,"$(INSTR)$(NAME):UDP_WATERMARK") +{ + field(DESC, "UDP Queue Usage") + field(EGU, "%") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} + +record(longin,"$(INSTR)$(NAME):NORMALISED_WATERMARK") +{ + field(DESC, "Normalised Queue Usage") + field(EGU, "%") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) NORM") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} + +record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK") +{ + field(DESC, "Sort Queue Usage") + field(EGU, "%") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} diff --git a/db/daq_common.db b/db/daq_common.db index fbbd9c9..6fcbe9a 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -236,39 +236,3 @@ record(ai, "$(INSTR)$(NAME):ELAPSED-TIME") field(PINI, "YES") # field(FLNK, "$(INSTR)$(NAME):ETO") } - -################################################################################ -# Stream Generator Status PVs - -record(longin,"$(INSTR)$(NAME):UDP_DROPPED") -{ - field(DESC, "UDP Packets Missed") - field(EGU, "Events") - field(DTYP, "asynInt32") - field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP") - # field(SCAN, "I/O Intr") - field(SCAN, "1 second") - field(PINI, "YES") -} - -record(longin,"$(INSTR)$(NAME):UDP_WATERMARK") -{ - field(DESC, "UDP Queue Usage") - field(EGU, "%") - field(DTYP, "asynInt32") - field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP") - # field(SCAN, "I/O Intr") - field(SCAN, "1 second") - field(PINI, "YES") -} - -record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK") -{ - field(DESC, "Partial Sort Queue Usage") - field(EGU, "%") - field(DTYP, "asynInt32") - field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT") - # field(SCAN, "I/O Intr") - field(SCAN, "1 second") - field(PINI, "YES") -} diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 638ea69..25da2ed 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -201,6 +201,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped); status = createInt32Param(status, P_UdpQueueHighWaterMarkString, &P_UdpQueueHighWaterMark); + status = createInt32Param(status, P_NormalisedQueueHighWaterMarkString, + &P_NormalisedQueueHighWaterMark); status = createInt32Param(status, P_SortedQueueHighWaterMarkString, &P_SortedQueueHighWaterMark); @@ -346,15 +348,26 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, if (function == P_UdpQueueHighWaterMark) { const double toPercent = 100. / (243. * udpQueueSize); - *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) / - sizeof(NormalisedEvent) * toPercent); + *value = + (epicsInt32)(((double)epicsRingBytesHighWaterMark(this->udpQueue)) / + sizeof(NormalisedEvent) * toPercent); // Aparently resetting the watermark causes problems... // at least concurrently :D // epicsRingBytesResetHighWaterMark(this->udpQueue); return asynSuccess; + + } else if (function == P_NormalisedQueueHighWaterMark) { + const double toPercent = 100. / (243. * udpQueueSize); + *value = (epicsInt32)(((double)epicsRingBytesHighWaterMark( + this->normalisedQueue)) / + sizeof(NormalisedEvent) * toPercent); + // epicsRingBytesResetHighWaterMark(this->sortedQueue); + return asynSuccess; + } else if (function == P_SortedQueueHighWaterMark) { const double toPercent = 100. / (243. * udpQueueSize); - *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) / + *value = (epicsInt32)(((double)epicsRingBytesHighWaterMark( + this->sortedQueue)) / sizeof(NormalisedEvent) * toPercent); // epicsRingBytesResetHighWaterMark(this->sortedQueue); return asynSuccess; @@ -781,15 +794,30 @@ void asynStreamGeneratorDriver::processEvents() { "%s:%s: starting count: (%d, %d, %d, %" PRIu64 ")\n", driverName, functionName, countPreset, timePreset, presetChannel, startTimestamp); + + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: starting count: %" PRIu64 " %" PRIu64 " %" PRIu64 "\n", + driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp); } + if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp) + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n", + driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); + + if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n", + driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); + if (currStatus == STATUS_COUNTING) { for (std::size_t i = 0; i < toProcess; ++i) { counts[eventsA[i].source == 0 ? eventsA[i].pixelId : this->num_channels - 1] += 1; - elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; + elapsedSeconds = + ((double)(eventsA[i].timestamp - startTimestamp)) / 1e9; const bool reachedCount = countPreset && counts[presetChannel] >= countPreset; @@ -798,10 +826,13 @@ void asynStreamGeneratorDriver::processEvents() { if (reachedCount || reachedTime) { - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: reached preset: (%lld, %f)\n", driverName, - functionName, counts[presetChannel], - elapsedSeconds); + asynPrint( + pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: reached preset: (%d, %d) (%lld, %f, %" PRIu64 + ")\n", + driverName, functionName, reachedCount, reachedTime, + counts[presetChannel], elapsedSeconds, + eventsA[i].timestamp); // TODO should really check there an no more events with the // same final timestamp diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 7bf3df5..7db1a5e 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -141,6 +141,7 @@ constexpr static char P_ClearCountsString[]{"C_%" PRIu64}; constexpr static char P_UdpDroppedString[]{"DROP"}; constexpr static char P_UdpQueueHighWaterMarkString[]{"UDP"}; +constexpr static char P_NormalisedQueueHighWaterMarkString[]{"NORM"}; constexpr static char P_SortedQueueHighWaterMarkString[]{"SORT"}; /******************************************************************************* @@ -188,6 +189,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { // System Status Parameter Identifying IDs int P_UdpDropped; int P_UdpQueueHighWaterMark; + int P_NormalisedQueueHighWaterMark; int P_SortedQueueHighWaterMark; private: