From ba07a8af9b270ffec997b86ae40e27fef5be9d70 Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Fri, 7 Nov 2025 14:28:01 +0100 Subject: [PATCH] shows queue usage as a percentage --- db/daq_common.db | 10 +++++----- src/asynStreamGeneratorDriver.cpp | 19 ++++++++++++------- src/asynStreamGeneratorDriver.h | 5 ++++- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/db/daq_common.db b/db/daq_common.db index f564816..2c5f7f4 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -213,7 +213,7 @@ record(ai,"$(INSTR)$(NAME):ELAPSED-TIME") record(longin,"$(INSTR)$(NAME):UDP_DROPPED") { - field(DESC, "Max Events in Queue") + field(DESC, "UDP Packets Missed") field(EGU, "Events") field(DTYP, "asynInt32") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP") @@ -224,8 +224,8 @@ record(longin,"$(INSTR)$(NAME):UDP_DROPPED") record(longin,"$(INSTR)$(NAME):UDP_WATERMARK") { - field(DESC, "Max Events in Queue") - field(EGU, "Events") + field(DESC, "UDP Queue Usage") + field(EGU, "%") field(DTYP, "asynInt32") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP") # field(SCAN, "I/O Intr") @@ -235,8 +235,8 @@ record(longin,"$(INSTR)$(NAME):UDP_WATERMARK") record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK") { - field(DESC, "Max Events in Queue") - field(EGU, "Events") + field(DESC, "Partial Sort Queue Usage") + field(EGU, "%") field(DTYP, "asynInt32") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT") # field(SCAN, "I/O Intr") diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 767a108..208584e 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -96,8 +96,10 @@ asynStatus asynStreamGeneratorDriver::createInt32Param( setIntegerParam(*variable, initialValue)); } -asynStatus asynStreamGeneratorDriver::createFloat64Param( - asynStatus status, char *name, int *variable, double initialValue) { +asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status, + char *name, + int *variable, + double initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamFloat64, variable) | setDoubleParam(*variable, initialValue)); @@ -114,7 +116,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynFloat64Mask | - asynDrvUserMask, /* Interface mask */ + asynDrvUserMask, /* Interface mask */ asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */ 0, /* asynFlags. This driver does not block and it is not multi-device, but has a @@ -125,6 +127,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), monitorTopic(monitorTopic), detectorTopic(detectorTopic), + udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize), // measured in max packet sizes udpQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), @@ -308,15 +311,17 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, getParamName(function, ¶mName); if (function == P_UdpQueueHighWaterMark) { - *value = epicsRingBytesHighWaterMark(this->udpQueue) / - sizeof(NormalisedEvent); + const double toPercent = 100. / (243. * udpQueueSize); + *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) / + sizeof(NormalisedEvent) * toPercent); // Aparently resetting the watermark causes problems... // at least concurrently :D // epicsRingBytesResetHighWaterMark(this->udpQueue); return asynSuccess; } else if (function == P_SortedQueueHighWaterMark) { - *value = epicsRingBytesHighWaterMark(this->sortedQueue) / - sizeof(NormalisedEvent); + const double toPercent = 100. / (243. * udpQueueSize); + *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) / + sizeof(NormalisedEvent) * toPercent); // epicsRingBytesResetHighWaterMark(this->sortedQueue); return asynSuccess; } diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index d986b02..ac8ce5e 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -150,8 +150,10 @@ class asynStreamGeneratorDriver : public asynPortDriver { const int num_channels; const bool kafkaEnabled; + const int kafkaQueueSize; const int kafkaMaxPacketSize; + const int udpQueueSize; epicsRingBytesId udpQueue; epicsRingBytesId sortedQueue; @@ -168,7 +170,8 @@ class asynStreamGeneratorDriver : public asynPortDriver { asynStatus createInt32Param(asynStatus status, char *name, int *variable, epicsInt32 initialValue = 0); - asynStatus createFloat64Param(asynStatus status, char *name, int *variable, double initialValue = 0); + asynStatus createFloat64Param(asynStatus status, char *name, int *variable, + double initialValue = 0); inline void queueForKafka(NormalisedEvent &ne);