From 5f95e82a3cfbf4333e0f9b1ecfbe49b691866d67 Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Thu, 6 Nov 2025 11:58:19 +0100 Subject: [PATCH] in the process of switching to a more batch processing approach. so far, seems like it can keep up --- db/channels.db | 6 +- db/daq_common.db | 31 +- scripts/st.cmd | 4 + src/asynStreamGeneratorDriver.cpp | 587 ++++++++++++++++++++---------- src/asynStreamGeneratorDriver.h | 12 +- 5 files changed, 452 insertions(+), 188 deletions(-) diff --git a/db/channels.db b/db/channels.db index 0b94f18..9df5d06 100644 --- a/db/channels.db +++ b/db/channels.db @@ -59,7 +59,8 @@ record(longin, "$(INSTR)$(NAME):M$(CHANNEL)") field(DTYP, "asynInt32") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)") # This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order - field(SCAN, "I/O Intr") + # field(SCAN, "I/O Intr") + field(SCAN, ".2 second") field(PINI, "YES") } @@ -69,6 +70,7 @@ record(ai, "$(INSTR)$(NAME):R$(CHANNEL)") field(EGU, "cts/sec") field(DTYP, "asynInt32") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)") - field(SCAN, "I/O Intr") + field(SCAN, ".2 second") + # field(SCAN, "I/O Intr") field(PINI, "YES") } diff --git a/db/daq_common.db b/db/daq_common.db index 410b80d..ef9f2ee 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -39,7 +39,8 @@ record(mbbi, "$(INSTR)$(NAME):STATUS") field(FRVL, "4") field(FRST, "INVALID") # This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order - field(SCAN, "I/O Intr") + #field(SCAN, "I/O Intr") + field(SCAN, ".5 second") field(PINI, "YES") } @@ -201,7 +202,33 @@ record(ai,"$(INSTR)$(NAME):ELAPSED-TIME") field(EGU, "sec") field(DTYP, "asynInt32") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME") - field(SCAN, "I/O Intr") + # field(SCAN, "I/O Intr") + field(SCAN, ".5 second") field(PINI, "YES") # field(FLNK, "$(INSTR)$(NAME):ETO") } + +################################################################################ +# Stream Generator Status PVs + +record(longin,"$(INSTR)$(NAME):UDP_WATERMARK") +{ + field(DESC, "Max Events in Queue") + field(EGU, "Events") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} + +record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK") +{ + field(DESC, "Max Events in Queue") + field(EGU, "Events") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} diff --git a/scripts/st.cmd b/scripts/st.cmd index 489cf84..3ba498b 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -9,6 +9,10 @@ epicsEnvSet("INSTR", "SQ:TEST:") epicsEnvSet("NAME", "SG") drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1) + +# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory +# 10'000 * 243 = 2.43e6 events + # asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192) asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 86a55aa..e7efa35 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -59,6 +59,11 @@ static void udpPollerTask(void *drvPvt) { pSGD->receiveUDP(); } +static void sortTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->partialSortEvents(); +} + static void daqTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->processEvents(); @@ -107,7 +112,12 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), monitorTopic(monitorTopic), detectorTopic(detectorTopic), - udpQueue(epicsRingBytesCreate(udpQueueSize * sizeof(NormalisedEvent))), + // so these first to are measured in max packet sizes + udpQueue( + epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), + // TODO configurable sizes + sortedQueue(epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), + // and these two are currently measured in event sizes... monitorQueue( epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))), detectorQueue( @@ -151,6 +161,11 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i); } + status = createInt32Param(status, P_UdpQueueHighWaterMarkString, + &P_UdpQueueHighWaterMark); + status = createInt32Param(status, P_SortedQueueHighWaterMarkString, + &P_SortedQueueHighWaterMark); + if (status) { epicsStdoutPrintf( "%s:%s: failed to create or setup parameters, status=%d\n", @@ -210,10 +225,26 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( /* Create the thread that orders the events and acts as our sinqDaq stand-in */ - status = (asynStatus)(epicsThreadCreate( - "sinqDAQ", epicsThreadPriorityMax, - epicsThreadGetStackSize(epicsThreadStackMedium), - (EPICSTHREADFUNC)::daqTask, this) == NULL); + status = + (asynStatus)(epicsThreadCreate( + "sinqDAQ", + epicsThreadPriorityMedium, // epicsThreadPriorityMax, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::daqTask, this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } + + /* Create the thread that orders packets of in preparation for our sinqDAQ stand-in + */ + status = + (asynStatus)(epicsThreadCreate( + "partialSort", + epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::sortTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); @@ -254,6 +285,32 @@ asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } +asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) { + + int function = pasynUser->reason; + asynStatus status = asynSuccess; + const char *paramName; + const char *functionName = "readInt32"; + getParamName(function, ¶mName); + + if (function == P_UdpQueueHighWaterMark) { + *value = + epicsRingBytesHighWaterMark(this->udpQueue) / sizeof(NormalisedEvent); + // Aparently resetting the watermark causes problems... + // at least concurrently :D + // epicsRingBytesResetHighWaterMark(this->udpQueue); + return asynSuccess; + } else if (function == P_SortedQueueHighWaterMark) { + *value = + epicsRingBytesHighWaterMark(this->sortedQueue) / sizeof(NormalisedEvent); + // epicsRingBytesResetHighWaterMark(this->sortedQueue); + return asynSuccess; + } + + return asynPortDriver::readInt32(pasynUser, value); + +} + asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, epicsInt32 value) { int function = pasynUser->reason; @@ -446,6 +503,57 @@ void asynStreamGeneratorDriver::receiveUDP() { } } + +struct { + bool operator()(const NormalisedEvent l, + const NormalisedEvent r) const { + return l.timestamp > r.timestamp; + } +} reverseSortEventsByTime; + +inline int eventsInQueue(epicsRingBytesId id) { + return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent); +} + +void asynStreamGeneratorDriver::partialSortEvents() { + + const char *functionName = "partialSortEvents"; + + // x * number of ids * max events in packet + int bufferedEvents = 5 * 10 * 243; + NormalisedEvent *events = new NormalisedEvent[bufferedEvents]; + + int queuedEvents = 0; + epicsTimeStamp lastSort = epicsTime::getCurrent(); + epicsTimeStamp currentTime = lastSort; + + while (true) { + + queuedEvents = eventsInQueue(this->udpQueue); // in case we can't wait + lastSort = epicsTime::getCurrent(); + currentTime = lastSort; + + // wait for mininmum packet frequency or enough packets to ensure we could potentially + // have at least 1 packet per mcpdid + while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) { + epicsThreadSleep(0.0001); // seconds + currentTime = epicsTime::getCurrent(); + queuedEvents = eventsInQueue(this->udpQueue); + } + + queuedEvents = std::min(queuedEvents, bufferedEvents); + + if (queuedEvents) { + epicsRingBytesGet(this->udpQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); + + std::sort(events, events + queuedEvents, reverseSortEventsByTime); + + epicsRingBytesPut(this->sortedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); + } + } + +} + inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &&ne) { if (this->kafkaEnabled) { if (ne.source == 0) @@ -461,206 +569,319 @@ void asynStreamGeneratorDriver::processEvents() { const char *functionName = "processEvents"; - const size_t queueBufferSize = - 10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent); + // x * number of ids * max events in packet * event size + int bufferedEvents = 5 * 10 * 243; + // we need a little extra space for merge sorting in + int extraBufferedEvents = 1 * 10 * 243; - struct { - bool operator()(const NormalisedEvent l, - const NormalisedEvent r) const { - return l.timestamp > r.timestamp; - } - } smallestToLargest; + // we have two buffers. We alternate between reading data into one of them, + // and then merge sorting into the other + NormalisedEvent *eventsA = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; + NormalisedEvent *eventsB = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; + NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents; + NormalisedEvent *eventsBLastEnd = eventsBLastStart; - // This should never be used. It is just instantiated to reserve a buffer - // of specific size. - std::vector queueBuffer; - queueBuffer.reserve(queueBufferSize); + int queuedEvents = 0; - std::priority_queue, - decltype(smallestToLargest)> - timeQueue(smallestToLargest, std::move(queueBuffer)); + epicsTimeStamp lastProcess = epicsTime::getCurrent(); + epicsTimeStamp currentTime = lastProcess; - // TODO epics doesn't seem to support uint64, you would need an array of - // uint32. It does support int64 though.. so we start with that epicsInt32 *counts = new epicsInt32[this->num_channels]; - const uint64_t minRateSamplePeriod = 100'000'000ll; - const size_t rateAverageWindow = 20; - size_t countDiffsPtr = 0; - epicsInt32 *rates = new epicsInt32[this->num_channels]; - epicsInt32 *countDiff = new epicsInt32[this->num_channels]; - epicsInt32 *countDiffs = - new epicsInt32[this->num_channels * rateAverageWindow]; - uint64_t *timeSpans = new uint64_t[this->num_channels]; - epicsTimeStamp lastRateUpdate = epicsTime::getCurrent(); - - asynStatus status = asynSuccess; - NormalisedEvent ne; - uint64_t newestTimestamp = 0; - uint64_t startTimestamp = std::numeric_limits::max(); - uint64_t currTimestamp; - epicsInt32 elapsedSeconds = 0; - epicsInt32 prevStatus = STATUS_IDLE; - epicsInt32 currStatus = STATUS_IDLE; - epicsInt32 countPreset = 0; - epicsInt32 timePreset = 0; - epicsInt32 presetChannel = 0; + epicsInt32 udpQueueHighWaterMark = 0; + epicsInt32 sortedQueueHighWaterMark = 0; while (true) { - // TODO depending on how this is implemented, I may also need to check - // that there is is enough bytes, in case it does partial writes... - if (epicsRingBytesGet(udpQueue, (char *)&ne, sizeof(NormalisedEvent))) { - // we should reastart this ioc at least every few years, as at ns - // resolution with a uint64_t we will have an overflow after around - // 4 years - newestTimestamp = std::max(newestTimestamp, ne.timestamp); + queuedEvents = eventsInQueue(this->sortedQueue); // in case we can't wait + lastProcess = epicsTime::getCurrent(); + currentTime = lastProcess; - ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0]; - - timeQueue.push(std::move(ne)); + // wait for mininmum packet frequency or enough packets to ensure we could potentially + // have at least 1 packet per mcpdid + while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) { + epicsThreadSleep(0.0001); // seconds + currentTime = epicsTime::getCurrent(); + queuedEvents = eventsInQueue(this->sortedQueue); } - // idea is to try and guarantee at least 1 packet per id or the min - // frequency for each id without actually checking all ids - if (timeQueue.size() >= 1500 * 10 || - (timeQueue.size() > 0 && - newestTimestamp - timeQueue.top().timestamp >= 200'000'000ull)) { - ne = timeQueue.top(); - timeQueue.pop(); + queuedEvents = std::min(queuedEvents, bufferedEvents); - status = getIntegerParam(this->P_Status, &currStatus); + NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents; - if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) { - // Starting a new count + // We read into the array, such that we have enough space, that the + // entirety of the leftover from the previous read can fit before this + // new read, in the case that all new events are newer timewise, and + // therefore, all events from eventsB have to be placed in a preceeding + // position. + epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent)); - // get current count configuration - getIntegerParam(this->P_CountPreset, &countPreset); - getIntegerParam(this->P_TimePreset, &timePreset); - getIntegerParam(this->P_MonitorChannel, &presetChannel); + int toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; - // reset status variables - startTimestamp = std::numeric_limits::max(); - for (size_t i = 0; i < this->num_channels; ++i) { - counts[i] = 0; - } + // TODO could also consider an in-place merge + eventsBLastEnd = std::merge( + newStartPtr, newStartPtr + queuedEvents, + eventsBLastStart, eventsBLastEnd, + eventsA, reverseSortEventsByTime + ); - // reset pvs - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); - } - setIntegerParam(P_ElapsedTime, 0); - callParamCallbacks(); - unlock(); + eventsBLastStart = eventsA + toProcess; - // TODO might consider throwing out current buffer as it is - // from before count started? then again, 0.2 ms or whatever is - // set above is quite a small preceeding amount of time, so - // maybe it doesn't matter - } - - prevStatus = currStatus; - - if (currStatus == STATUS_COUNTING) { - startTimestamp = std::min(startTimestamp, ne.timestamp); - currTimestamp = ne.timestamp; - elapsedSeconds = - 0 ? currTimestamp <= startTimestamp - : ((double)(currTimestamp - startTimestamp)) / 1e9; - - // is our count finished? - if ((countPreset && counts[presetChannel] >= countPreset) || - (timePreset && elapsedSeconds >= timePreset)) { - - // filter out events that occured after the specified time - if (ne.timestamp - startTimestamp <= countPreset) { - counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; - this->queueForKafka(std::move(ne)); - - // add any remaining events with the same timestamp - // we could theoretically have a small overrun if the - // timestamps are identical on the monitor channel - while (!timeQueue.empty() && - !timeQueue.top().timestamp == currTimestamp) { - ne = timeQueue.top(); - timeQueue.pop(); - counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; - this->queueForKafka(std::move(ne)); - } - } - - countPreset = 0; - timePreset = 0; - - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); - } - setIntegerParam(P_ElapsedTime, elapsedSeconds); - setIntegerParam(P_CountPreset, countPreset); - setIntegerParam(P_TimePreset, timePreset); - callParamCallbacks(); - setIntegerParam(P_Status, STATUS_IDLE); - callParamCallbacks(); - unlock(); - - } else { - - counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; - this->queueForKafka(std::move(ne)); - - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); - } - setIntegerParam(P_ElapsedTime, elapsedSeconds); - callParamCallbacks(); - unlock(); - } - } + for (size_t i = 0; i < toProcess; ++i) { + counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += 1; } - // Careful changing any of these magic numbers until I clean this up - // as you might end up calculating the wrong rate - epicsTimeStamp currentTime = epicsTime::getCurrent(); - if (epicsTimeDiffInNS(¤tTime, &lastRateUpdate) > - minRateSamplePeriod) { - timeSpans[countDiffsPtr] = - epicsTimeDiffInNS(¤tTime, &lastRateUpdate); - - uint64_t totalTime = 0; - for (size_t i = 0; i <= rateAverageWindow; ++i) { - totalTime += timeSpans[i]; - } - - lastRateUpdate = currentTime; - - for (size_t i = 0; i <= this->num_channels; ++i) { - countDiffs[i * rateAverageWindow + countDiffsPtr] = - countDiff[i]; - - uint64_t cnt = 0; - for (size_t j = 0; j <= rateAverageWindow; ++j) { - cnt += countDiffs[i * rateAverageWindow + j]; - } - rates[i] = cnt / (totalTime * 1e-9); - - countDiff[i] = 0; - } - - countDiffsPtr = (countDiffsPtr + 1) % rateAverageWindow; - - if (countDiffsPtr % 5 == 0) { - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Rates[i], rates[i]); - } - callParamCallbacks(); - unlock(); - } + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); } + + //setIntegerParam(P_ElapsedTime, elapsedSeconds); + + std::swap(eventsA, eventsB); + } + + // // TODO this is totally decoupled!!! + // const size_t queueBufferSize = + // 10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent); + + // //struct { + // // bool operator()(const NormalisedEvent l, + // // const NormalisedEvent r) const { + // // return l.timestamp > r.timestamp; + // // } + // //} smallestToLargest; + + // //// This should never be used. It is just instantiated to reserve a buffer + // //// of specific size. + // //std::vector queueBuffer; + // //queueBuffer.reserve(queueBufferSize); + + // //std::priority_queue, + // // decltype(smallestToLargest)> + // // timeQueue(smallestToLargest, std::move(queueBuffer)); + + // NormalisedEvent* timeQueue = new NormalisedEvent[queueBufferSize]; + + // // TODO epics doesn't seem to support uint64, you would need an array of + // // uint32. It does support int64 though.. so we start with that + // epicsInt32 *counts = new epicsInt32[this->num_channels]; + + // const uint64_t minRateSamplePeriod = 100'000'000ll; + // const size_t rateAverageWindow = 20; + // size_t countDiffsPtr = 0; + // epicsInt32 *rates = new epicsInt32[this->num_channels]; + // epicsInt32 *countDiff = new epicsInt32[this->num_channels]; + // epicsInt32 *countDiffs = + // new epicsInt32[this->num_channels * rateAverageWindow]; + // uint64_t *timeSpans = new uint64_t[this->num_channels]; + // epicsTimeStamp lastRateUpdate = epicsTime::getCurrent(); + + // asynStatus status = asynSuccess; + // NormalisedEvent ne; + // uint64_t newestTimestamp = 0; + // uint64_t startTimestamp = std::numeric_limits::max(); + // uint64_t currTimestamp; + // epicsInt32 elapsedSeconds = 0; + // epicsInt32 prevStatus = STATUS_IDLE; + // epicsInt32 currStatus = STATUS_IDLE; + // epicsInt32 countPreset = 0; + // epicsInt32 timePreset = 0; + // epicsInt32 presetChannel = 0; + // epicsInt32 udpQueueHighWaterMark = 0; + + // while (true) { + + // // I think mostly everything should already by sorted + // // could probably in the other thread guarantee that each packet is sorted + // // but probably it already is... + // // + // // so really we just need to merge sort chunks + + // // idea is to try and guarantee at least 1 packet per id or the min + // // frequency for each id without actually checking all ids + // // size_t timeQueuePtr = 0; + // // while (timeQueuePtr < 1500 * 10) { + + // // // TODO depending on how this is implemented, I may also need to + // // // check that there is is enough bytes, in case it does partial + // // // writes... + // // if (epicsRingBytesGet(udpQueue, (char *)&ne, + // // sizeof(NormalisedEvent))) { + // // // we should restart this ioc at least every few years, as at ns + // // // resolution with a uint64_t we will have an overflow after + // // // around 4 years + // // newestTimestamp = std::max(newestTimestamp, ne.timestamp); + + // // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0]; + + // // timeQueue.push(std::move(ne)); + // // } + + // // } + + + // // while (timeQueue.empty() || + // // (timeQueue.size() < 1500 * 10 && + // // newestTimestamp - timeQueue.top().timestamp < 200'000'000ull)) { + + // // // TODO depending on how this is implemented, I may also need to + // // // check that there is is enough bytes, in case it does partial + // // // writes... + // // if (epicsRingBytesGet(udpQueue, (char *)&ne, + // // sizeof(NormalisedEvent))) { + // // // we should restart this ioc at least every few years, as at ns + // // // resolution with a uint64_t we will have an overflow after + // // // around 4 years + // // newestTimestamp = std::max(newestTimestamp, ne.timestamp); + + // // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0]; + + // // timeQueue.push(std::move(ne)); + // // } + // // } + + // // ne = timeQueue.top(); + // // timeQueue.pop(); + + // // status = getIntegerParam(this->P_Status, &currStatus); + + // // udpQueueHighWaterMark = + // // epicsRingBytesHighWaterMark(udpQueue) / sizeof(NormalisedEvent); + + // // // if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) { + // // // // Starting a new count + + // // // // get current count configuration + // // // getIntegerParam(this->P_CountPreset, &countPreset); + // // // getIntegerParam(this->P_TimePreset, &timePreset); + // // // getIntegerParam(this->P_MonitorChannel, &presetChannel); + + // // // // reset status variables + // // // startTimestamp = std::numeric_limits::max(); + // // // for (size_t i = 0; i < this->num_channels; ++i) { + // // // counts[i] = 0; + // // // } + + // // // // reset pvs + // // // // lock(); + // // // // for (size_t i = 0; i < num_channels; ++i) { + // // // // setIntegerParam(P_Counts[i], counts[i]); + // // // // } + // // // // setIntegerParam(P_ElapsedTime, 0); + // // // // callParamCallbacks(); + // // // // unlock(); + + // // // // TODO might consider throwing out current buffer as it is + // // // // from before count started? then again, 0.2 ms or whatever is + // // // // set above is quite a small preceeding amount of time, so + // // // // maybe it doesn't matter + // // // } + + // // // prevStatus = currStatus; + + // // //if (currStatus == STATUS_COUNTING) { + // // startTimestamp = std::min(startTimestamp, ne.timestamp); + // // currTimestamp = ne.timestamp; + // // elapsedSeconds = + // // 0 ? currTimestamp <= startTimestamp + // // : ((double)(currTimestamp - startTimestamp)) / 1e9; + + // // // is our count finished? + // // // if ((countPreset && counts[presetChannel] >= countPreset) || + // // // (timePreset && elapsedSeconds >= timePreset)) { + + // // // // filter out events that occured after the specified time + // // // if (ne.timestamp - startTimestamp <= countPreset) { + // // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; + // // // this->queueForKafka(std::move(ne)); + + // // // // add any remaining events with the same timestamp + // // // // we could theoretically have a small overrun if the + // // // // timestamps are identical on the monitor channel + // // // while (!timeQueue.empty() && + // // // !timeQueue.top().timestamp == currTimestamp) { + // // // ne = timeQueue.top(); + // // // timeQueue.pop(); + // // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; + // // // this->queueForKafka(std::move(ne)); + // // // } + // // // } + + // // // countPreset = 0; + // // // timePreset = 0; + + // // // // lock(); + // // // for (size_t i = 0; i < num_channels; ++i) { + // // // setIntegerParam(P_Counts[i], counts[i]); + // // // } + // // // setIntegerParam(P_ElapsedTime, elapsedSeconds); + // // // setIntegerParam(P_CountPreset, countPreset); + // // // setIntegerParam(P_TimePreset, timePreset); + // // // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark); + // // // // callParamCallbacks(); + // // // setIntegerParam(P_Status, STATUS_IDLE); + // // // // callParamCallbacks(); + // // // // unlock(); + + // // // epicsRingBytesResetHighWaterMark(udpQueue); + + // // // } else { + + // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; + // // this->queueForKafka(std::move(ne)); + + // // // lock(); + // // for (size_t i = 0; i < num_channels; ++i) { + // // setIntegerParam(P_Counts[i], counts[i]); + // // } + // // setIntegerParam(P_ElapsedTime, elapsedSeconds); + // // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark); + // // // callParamCallbacks(); + // // // unlock(); + // // // } + // // //} + + // // // Careful changing any of these magic numbers until I clean this up + // // // as you might end up calculating the wrong rate + // // // epicsTimeStamp currentTime = epicsTime::getCurrent(); + // // // if (epicsTimeDiffInNS(¤tTime, &lastRateUpdate) > + // // // minRateSamplePeriod) { + // // // timeSpans[countDiffsPtr] = + // // // epicsTimeDiffInNS(¤tTime, &lastRateUpdate); + + // // // uint64_t totalTime = 0; + // // // for (size_t i = 0; i <= rateAverageWindow; ++i) { + // // // totalTime += timeSpans[i]; + // // // } + + // // // lastRateUpdate = currentTime; + + // // // for (size_t i = 0; i <= this->num_channels; ++i) { + // // // countDiffs[i * rateAverageWindow + countDiffsPtr] = + // // // countDiff[i]; + + // // // uint64_t cnt = 0; + // // // for (size_t j = 0; j <= rateAverageWindow; ++j) { + // // // cnt += countDiffs[i * rateAverageWindow + j]; + // // // } + // // // rates[i] = cnt / (totalTime * 1e-9); + + // // // countDiff[i] = 0; + // // // } + + // // // countDiffsPtr = (countDiffsPtr + 1) % rateAverageWindow; + + // // // if (countDiffsPtr % 5 == 0) { + // // // // lock(); + // // // for (size_t i = 0; i < num_channels; ++i) { + // // // setIntegerParam(P_Rates[i], rates[i]); + // // // } + // // // // callParamCallbacks(); + // // // // unlock(); + // // // } + // // // } + // } } void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue, diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index e3dfa25..fa5358d 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -59,8 +59,8 @@ struct __attribute__((__packed__)) MonitorEvent { struct __attribute__((__packed__)) NormalisedEvent { uint64_t timestamp; + uint32_t pixelId : 24; uint8_t source; - uint32_t pixelId; // inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t // pixelId) @@ -96,6 +96,9 @@ struct __attribute__((__packed__)) NormalisedEvent { #define P_RateString "RATE%d" #define P_ClearCountsString "C_%d" +#define P_UdpQueueHighWaterMarkString "UDP" +#define P_SortedQueueHighWaterMarkString "SORT" + /******************************************************************************* * Stream Generator Coordinating Class */ @@ -110,9 +113,11 @@ class asynStreamGeneratorDriver : public asynPortDriver { const int kafkaMaxPacketSize); virtual ~asynStreamGeneratorDriver(); + virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value); void receiveUDP(); + void partialSortEvents(); void processEvents(); void produceMonitor(); void produceDetector(); @@ -133,6 +138,10 @@ class asynStreamGeneratorDriver : public asynPortDriver { int *P_Rates; int *P_ClearCounts; + // System Status Parameter Identifying IDs + int P_UdpQueueHighWaterMark; + int P_SortedQueueHighWaterMark; + private: asynUser *pasynUDPUser; epicsEventId pausedEventId; @@ -142,6 +151,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { const int kafkaMaxPacketSize; epicsRingBytesId udpQueue; + epicsRingBytesId sortedQueue; epicsRingBytesId monitorQueue; rd_kafka_t *monitorProducer;