From 7bacc716ccbd4384a53f0e2f7a4a8c362d1a01bb Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Fri, 31 Oct 2025 19:10:59 +0100 Subject: [PATCH] adds elapsed time and time based preset --- db/channels.db | 17 ++-- db/daq_common.db | 40 ++++----- src/asynStreamGeneratorDriver.cpp | 130 ++++++++++++++++++++++++------ src/asynStreamGeneratorDriver.h | 7 ++ 4 files changed, 141 insertions(+), 53 deletions(-) diff --git a/db/channels.db b/db/channels.db index 7073cef..c2cde1f 100644 --- a/db/channels.db +++ b/db/channels.db @@ -104,11 +104,12 @@ record(longin, "$(INSTR)$(NAME):M$(CHANNEL)") field(PINI, "YES") } -# record(ai, "$(INSTR)$(NAME):R$(CHANNEL)") -# { -# field(DESC, "Rate of DAQ CH$(CHANNEL)") -# field(INP, "@... readRate($(INSTR)$(NAME):, $(CHANNEL)) $(PORT)") -# field(DTYP, "stream") -# field(EGU, "cts/sec") -# field(SCAN, "1 second") -# } +record(ai, "$(INSTR)$(NAME):R$(CHANNEL)") +{ + field(DESC, "Rate of DAQ CH$(CHANNEL)") + field(EGU, "cts/sec") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)") + field(SCAN, "I/O Intr") + field(PINI, "YES") +} diff --git a/db/daq_common.db b/db/daq_common.db index a5e5501..7c91111 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -89,17 +89,16 @@ record(ao,"$(INSTR)$(NAME):PRESET-COUNT") field(PREC, 2) } -# record(ao,"$(INSTR)$(NAME):PRESET-TIME") -# { -# field(DESC, "Count for specified time") -# field(DTYP, "stream") -# field(OUT, "@... startWithTimePreset$(CHANNELS)($(INSTR)$(NAME):) $(PORT)") -# field(VAL, 0) -# field(PREC, 2) -# field(EGU, "seconds") -# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS") -# } -# +record(ao,"$(INSTR)$(NAME):PRESET-TIME") +{ + field(DESC, "Count for specified time") + field(EGU, "seconds") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME") + field(VAL, 0) + field(PREC, 2) +} + # record(bo,"$(INSTR)$(NAME):PAUSE") # { # field(DESC, "Pause the current count") @@ -168,10 +167,10 @@ record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV") # record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV") # { # field(DESC, "Minimum rate for counting to proceed") +# field(EGU, "cts/sec") # field(INP, "@... readMinRate($(INSTR)$(NAME):) $(PORT)") # field(DTYP, "stream") # field(SCAN, "1 second") -# field(EGU, "cts/sec") # } # # record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR") @@ -187,10 +186,10 @@ record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV") # record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV") # { # field(DESC, "Channel monitored for minimum rate") +# field(EGU, "CH") # field(INP, "@... readRateMonitor($(INSTR)$(NAME):) $(PORT)") # field(DTYP, "stream") # field(SCAN, "1 second") -# field(EGU, "CH") # } # # record(longout, "$(INSTR)$(NAME):CT") @@ -204,9 +203,12 @@ record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV") ################################################################################ # Read all monitors values -# record(ai,"$(INSTR)$(NAME):ELAPSED-TIME") -# { -# field(DESC, "DAQ Measured Time") -# field(EGU, "sec") -# field(FLNK, "$(INSTR)$(NAME):ETO") -# } +record(ai,"$(INSTR)$(NAME):ELAPSED-TIME") +{ + field(DESC, "DAQ Measured Time") + field(EGU, "sec") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME") + field(SCAN, "I/O Intr") + field(PINI, "YES") +} diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 96a52b1..0414a0e 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -3,6 +3,7 @@ #include #include #include +#include // Just for printing #define __STDC_FORMAT_MACROS @@ -86,10 +87,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, num_channels(numChannels + 1), monitorQueue(1000, false), detectorQueue(1000, false) { const char *functionName = "asynStreamGeneratorDriver"; - // Parameter Setup - char pv_name_buffer[100]; - P_Counts = new int[this->num_channels]; + // Parameter Setup asynStatus status = asynSuccess; status = (asynStatus)(status | createParam(P_StatusString, asynParamInt32, @@ -104,12 +103,23 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, asynParamInt32, &P_CountPreset)); status = (asynStatus)(status | setIntegerParam(P_CountPreset, 0)); + status = (asynStatus)(status | createParam(P_TimePresetString, + asynParamInt32, &P_TimePreset)); + status = (asynStatus)(status | setIntegerParam(P_TimePreset, 0)); + + status = (asynStatus)(status | createParam(P_ElapsedTimeString, + asynParamInt32, &P_ElapsedTime)); + status = (asynStatus)(status | setIntegerParam(P_ElapsedTime, 0)); + status = (asynStatus)(status | createParam(P_MonitorChannelString, asynParamInt32, &P_MonitorChannel)); status = (asynStatus)(status | setIntegerParam(P_MonitorChannel, 0)); - // Create PVs templated on Channel Number + // Create Parameters templated on Channel Number + char pv_name_buffer[100]; + P_Counts = new int[this->num_channels]; + P_Rates = new int[this->num_channels]; for (size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); @@ -117,6 +127,13 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, (asynStatus)(status | createParam(pv_name_buffer, asynParamInt32, P_Counts + i)); status = (asynStatus)(status | setIntegerParam(P_Counts[i], 0)); + + memset(pv_name_buffer, 0, 100); + epicsSnprintf(pv_name_buffer, 100, P_RateString, i); + status = + (asynStatus)(status | createParam(pv_name_buffer, asynParamInt32, + P_Rates + i)); + status = (asynStatus)(status | setIntegerParam(P_Rates[i], 0)); } if (status) { @@ -128,8 +145,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, // Create Events this->pausedEventId = epicsEventCreate(epicsEventEmpty); - this->monitorProducer = create_kafka_producer(); - this->detectorProducer = create_kafka_producer(); + // this->monitorProducer = create_kafka_producer(); + // this->detectorProducer = create_kafka_producer(); // Setup for Thread Producing Monitor Kafka Events status = @@ -180,6 +197,7 @@ asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { // should make sure queues are empty and freed // and that the kafka producers are flushed and freed delete[] P_Counts; + delete[] P_Rates; // TODO add exit should perhaps ensure the queue is flushed // rd_kafka_poll(producer, 0); @@ -205,6 +223,13 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, // } if (function == P_CountPreset) { + // TODO should block setting a preset when already set + setIntegerParam(function, value); + setIntegerParam(P_Status, STATUS_COUNTING); + status = (asynStatus)callParamCallbacks(); + epicsEventSignal(this->pausedEventId); + } else if (function == P_TimePreset) { + // TODO should block setting a preset when already set setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); status = (asynStatus)callParamCallbacks(); @@ -247,7 +272,8 @@ void asynStreamGeneratorDriver::receiveUDP() { epicsInt32 val; epicsInt32 currentStatus; epicsInt32 countPreset = 0; - epicsInt32 presetChannel = 1; + epicsInt32 timePreset = 0; + epicsInt32 presetChannel = 0; const char *functionName = "receiveUDP"; @@ -255,6 +281,10 @@ void asynStreamGeneratorDriver::receiveUDP() { // uint32. It does support int64 though.. so we start with that epicsInt32 *counts = new epicsInt32[this->num_channels]; + uint64_t start_time = std::numeric_limits::max(); + uint64_t current_time = 0; + epicsInt32 elapsedTime = 0; + while (true) { status = getIntegerParam(this->P_Status, ¤tStatus); @@ -263,6 +293,7 @@ void asynStreamGeneratorDriver::receiveUDP() { epicsEventWait(this->pausedEventId); getIntegerParam(this->P_CountPreset, &countPreset); + getIntegerParam(this->P_TimePreset, &timePreset); getIntegerParam(this->P_MonitorChannel, &presetChannel); // memset doesn't work with epicsInt32 @@ -270,10 +301,15 @@ void asynStreamGeneratorDriver::receiveUDP() { counts[i] = 0; } + start_time = std::numeric_limits::max(); + current_time = 0; + elapsedTime = 0; + lock(); for (size_t i = 0; i < num_channels; ++i) { setIntegerParam(P_Counts[i], counts[i]); } + setIntegerParam(P_ElapsedTime, 0); callParamCallbacks(); unlock(); @@ -296,6 +332,12 @@ void asynStreamGeneratorDriver::receiveUDP() { size_t total_events = (header->BufferLength - 21) / 3; + start_time = + std::min(start_time, (uint64_t)(header->nanosecs() / 1e9)); + // This is maybe safer, in case the time wraps back around? + // if (start_time == std::numeric_limits::max()) + // start_time = header->nanosecs() /1e9; + // TODO lots of checks and validation missing everywhere here if (received == total_events * 6 + 42) { // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, @@ -324,6 +366,12 @@ void asynStreamGeneratorDriver::receiveUDP() { nme->DataID = m_event->DataID; this->monitorQueue.push(nme); + current_time = std::max( + current_time, + (uint64_t)((header->nanosecs() + + (uint64_t)m_event->nanosecs()) / + 1e9)); + } else { // Detector Event DetectorEvent *d_event = (DetectorEvent *)event; counts[0] += 1; @@ -334,6 +382,12 @@ void asynStreamGeneratorDriver::receiveUDP() { header->nanosecs() + (uint64_t)d_event->nanosecs(); nde->PixID = d_event->pixelId(header->McpdID); this->detectorQueue.push(nde); + + current_time = std::max( + current_time, + (uint64_t)((header->nanosecs() + + (uint64_t)d_event->nanosecs()) / + 1e9)); } } @@ -341,6 +395,8 @@ void asynStreamGeneratorDriver::receiveUDP() { for (size_t i = 0; i < num_channels; ++i) { setIntegerParam(P_Counts[i], counts[i]); } + elapsedTime = current_time - start_time; + setIntegerParam(P_ElapsedTime, elapsedTime); callParamCallbacks(); unlock(); } else { @@ -349,10 +405,11 @@ void asynStreamGeneratorDriver::receiveUDP() { functionName); } - if (countPreset && counts[presetChannel] >= countPreset) { + if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedTime >= timePreset)) { lock(); setIntegerParam(P_Status, STATUS_IDLE); setIntegerParam(P_CountPreset, 0); + setIntegerParam(P_TimePreset, 0); callParamCallbacks(); unlock(); } @@ -412,7 +469,6 @@ void asynStreamGeneratorDriver::produceMonitor() { &tof, &did); builder.Finish(message, "ev42"); - // printf("buffer size: %d\n", builder.GetSize()); rd_kafka_resp_err_t err = rd_kafka_producev( monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), @@ -429,15 +485,8 @@ void asynStreamGeneratorDriver::produceMonitor() { // rd_kafka_err2str(err)); } - // epicsStdoutPrintf("Kafka Queue Size %d\n", - // rd_kafka_outq_len(monitorProducer)); - rd_kafka_poll(monitorProducer, 0); - // printf("Monitor Events Queued before sending %d\n", - // this->monitorQueue.getHighWaterMark()); - // this->monitorQueue.resetHighWaterMark(); - tof.clear(); did.clear(); } @@ -447,19 +496,37 @@ void asynStreamGeneratorDriver::produceMonitor() { void asynStreamGeneratorDriver::produceDetector() { + static const size_t bufferSize = 9000; flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; - tof.reserve(9000); + tof.reserve(bufferSize); std::vector did; - did.reserve(9000); + did.reserve(bufferSize); int total = 0; epicsTimeStamp last_sent = epicsTime::getCurrent(); uint64_t message_id = 0; + struct { + bool operator()(const uint64_t l, const uint64_t r) const { + return l > r; + } + } smallestToLargest; + + // This should never be used. It is just instantiated to reserve a buffer + // of specific size. + std::vector queueBuffer; + queueBuffer.reserve(bufferSize); + + std::priority_queue, + decltype(smallestToLargest)> + timeQueue(smallestToLargest, std::move(queueBuffer)); + + uint64_t newest = 0; + while (true) { if (!this->detectorQueue.isEmpty()) { @@ -468,11 +535,30 @@ void asynStreamGeneratorDriver::produceDetector() { auto nde = this->detectorQueue.pop(); tof.push_back(nde->TimeStamp); did.push_back(nde->PixID); + + newest = std::max(newest, nde->TimeStamp); + timeQueue.push(nde->TimeStamp); + delete nde; } else { epicsThreadSleep(0.001); // seconds } + while (!timeQueue.empty() && + (timeQueue.size() >= 8192 || + (newest - timeQueue.top()) > 5'000'000'000ull)) + timeQueue.pop(); + epicsInt32 rate = 0; + if (timeQueue.size() > 1) { + rate = ((double)timeQueue.size() / + ((double)(newest - timeQueue.top()) * 1e-9)); + } + + lock(); + setIntegerParam(P_Rates[0], rate); + callParamCallbacks(); + unlock(); + epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds @@ -492,7 +578,6 @@ void asynStreamGeneratorDriver::produceDetector() { &tof, &did); builder.Finish(message, "ev42"); - // printf("buffer size: %d\n", builder.GetSize()); rd_kafka_resp_err_t err = rd_kafka_producev( detectorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST2"), @@ -509,15 +594,8 @@ void asynStreamGeneratorDriver::produceDetector() { // rd_kafka_err2str(err)); } - // epicsStdoutPrintf("Kafka Queue Size %d\n", - // rd_kafka_outq_len(monitorProducer)); - rd_kafka_poll(detectorProducer, 0); - // printf("Detector Events Queued before sending %d\n", - // this->detectorQueue.getHighWaterMark()); - // this->detectorQueue.resetHighWaterMark(); - tof.clear(); did.clear(); } diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index f3d2ac4..f770cf1 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -83,8 +83,12 @@ struct __attribute__((__packed__)) NormalisedDetectorEvent { #define P_StatusString "STATUS" #define P_ResetString "RESET" #define P_CountPresetString "P_CNT" +#define P_TimePresetString "P_TIME" +#define P_ElapsedTimeString "TIME" #define P_MonitorChannelString "MONITOR" + #define P_CountsString "COUNTS%d" +#define P_RateString "RATE%d" /******************************************************************************* * Stream Generator Coordinating Class @@ -106,8 +110,11 @@ class asynStreamGeneratorDriver : public asynPortDriver { int P_Status; int P_Reset; int P_CountPreset; + int P_TimePreset; + int P_ElapsedTime; int P_MonitorChannel; int *P_Counts; + int *P_Rates; private: asynUser *pasynUDPUser;