From e53a2a4f409eeb049c9627227d87d302ca3e91b4 Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Thu, 6 Nov 2025 15:30:25 +0100 Subject: [PATCH] finished converting the processing to a batch-wise variant --- Makefile | 6 +- scripts/st.cmd | 3 +- src/asynStreamGeneratorDriver.cpp | 421 ++++++++---------------------- src/asynStreamGeneratorDriver.h | 2 +- 4 files changed, 123 insertions(+), 309 deletions(-) diff --git a/Makefile b/Makefile index 5ae267d..400d0cb 100644 --- a/Makefile +++ b/Makefile @@ -20,9 +20,11 @@ TEMPLATES += db/channels.db db/daq_common.db # Source files to build SOURCES += src/asynStreamGeneratorDriver.cpp -USR_CFLAGS += -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings +# I don't think specifying the optimisation level like this is correct... +# but I doesn't hurt :D +USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings # Required to support EV42/44 -USR_CXXFLAGS += -I../dep/flatbuffers/include/ -I../schemas +USR_CXXFLAGS += -O3 -I../dep/flatbuffers/include/ -I../schemas LIB_SYS_LIBS += rdkafka diff --git a/scripts/st.cmd b/scripts/st.cmd index 3ba498b..9c3d3a6 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -14,7 +14,8 @@ drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1) # 10'000 * 243 = 2.43e6 events # asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192) -asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0) +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480) +# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0) dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5") diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index e7efa35..dec026f 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -1,5 +1,6 @@ #include "asynOctetSyncIO.h" #include "ev42_events_generated.h" +#include #include #include #include @@ -112,16 +113,16 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), monitorTopic(monitorTopic), detectorTopic(detectorTopic), - // so these first to are measured in max packet sizes + // measured in max packet sizes udpQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), // TODO configurable sizes - sortedQueue(epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), - // and these two are currently measured in event sizes... + sortedQueue( + epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), monitorQueue( - epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))), + epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), detectorQueue( - epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))), + epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), kafkaMaxPacketSize(kafkaMaxPacketSize) { const char *functionName = "asynStreamGeneratorDriver"; @@ -237,14 +238,13 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( exit(1); } - /* Create the thread that orders packets of in preparation for our sinqDAQ stand-in + /* Create the thread that orders packets of in preparation for our sinqDAQ + * stand-in */ - status = - (asynStatus)(epicsThreadCreate( - "partialSort", - epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackMedium), - (EPICSTHREADFUNC)::sortTask, this) == NULL); + status = (asynStatus)(epicsThreadCreate( + "partialSort", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::sortTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); @@ -285,7 +285,8 @@ asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } -asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) { +asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, + epicsInt32 *value) { int function = pasynUser->reason; asynStatus status = asynSuccess; @@ -294,21 +295,20 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 getParamName(function, ¶mName); if (function == P_UdpQueueHighWaterMark) { - *value = - epicsRingBytesHighWaterMark(this->udpQueue) / sizeof(NormalisedEvent); - // Aparently resetting the watermark causes problems... - // at least concurrently :D - // epicsRingBytesResetHighWaterMark(this->udpQueue); - return asynSuccess; + *value = epicsRingBytesHighWaterMark(this->udpQueue) / + sizeof(NormalisedEvent); + // Aparently resetting the watermark causes problems... + // at least concurrently :D + // epicsRingBytesResetHighWaterMark(this->udpQueue); + return asynSuccess; } else if (function == P_SortedQueueHighWaterMark) { - *value = - epicsRingBytesHighWaterMark(this->sortedQueue) / sizeof(NormalisedEvent); - // epicsRingBytesResetHighWaterMark(this->sortedQueue); - return asynSuccess; + *value = epicsRingBytesHighWaterMark(this->sortedQueue) / + sizeof(NormalisedEvent); + // epicsRingBytesResetHighWaterMark(this->sortedQueue); + return asynSuccess; } return asynPortDriver::readInt32(pasynUser, value); - } asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, @@ -503,13 +503,11 @@ void asynStreamGeneratorDriver::receiveUDP() { } } - struct { - bool operator()(const NormalisedEvent l, - const NormalisedEvent r) const { - return l.timestamp > r.timestamp; + bool operator()(const NormalisedEvent l, const NormalisedEvent r) const { + return l.timestamp < r.timestamp; } -} reverseSortEventsByTime; +} oldestEventsFirst; inline int eventsInQueue(epicsRingBytesId id) { return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent); @@ -526,35 +524,37 @@ void asynStreamGeneratorDriver::partialSortEvents() { int queuedEvents = 0; epicsTimeStamp lastSort = epicsTime::getCurrent(); epicsTimeStamp currentTime = lastSort; - + while (true) { queuedEvents = eventsInQueue(this->udpQueue); // in case we can't wait lastSort = epicsTime::getCurrent(); currentTime = lastSort; - // wait for mininmum packet frequency or enough packets to ensure we could potentially - // have at least 1 packet per mcpdid - while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) { + // wait for mininmum packet frequency or enough packets to ensure we + // could potentially have at least 1 packet per mcpdid + while (queuedEvents < bufferedEvents && + epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) { epicsThreadSleep(0.0001); // seconds - currentTime = epicsTime::getCurrent(); - queuedEvents = eventsInQueue(this->udpQueue); - } + currentTime = epicsTime::getCurrent(); + queuedEvents = eventsInQueue(this->udpQueue); + } queuedEvents = std::min(queuedEvents, bufferedEvents); - if (queuedEvents) { - epicsRingBytesGet(this->udpQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); + if (queuedEvents) { + epicsRingBytesGet(this->udpQueue, (char *)events, + queuedEvents * sizeof(NormalisedEvent)); - std::sort(events, events + queuedEvents, reverseSortEventsByTime); + std::sort(events, events + queuedEvents, oldestEventsFirst); - epicsRingBytesPut(this->sortedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); - } + epicsRingBytesPut(this->sortedQueue, (char *)events, + queuedEvents * sizeof(NormalisedEvent)); + } } - } -inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &&ne) { +inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { if (this->kafkaEnabled) { if (ne.source == 0) epicsRingBytesPut(this->monitorQueue, (char *)&ne, @@ -576,8 +576,10 @@ void asynStreamGeneratorDriver::processEvents() { // we have two buffers. We alternate between reading data into one of them, // and then merge sorting into the other - NormalisedEvent *eventsA = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; - NormalisedEvent *eventsB = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; + NormalisedEvent *eventsA = + new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; + NormalisedEvent *eventsB = + new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents; NormalisedEvent *eventsBLastEnd = eventsBLastStart; @@ -587,24 +589,36 @@ void asynStreamGeneratorDriver::processEvents() { epicsTimeStamp currentTime = lastProcess; epicsInt32 *counts = new epicsInt32[this->num_channels]; + double elapsedSeconds = 0; + uint64_t startTimestamp = std::numeric_limits::max(); + uint64_t currTimestamp; + epicsInt32 currStatus = STATUS_IDLE; + epicsInt32 prevStatus = STATUS_IDLE; + epicsInt32 countPreset; + epicsInt32 timePreset; + epicsInt32 presetChannel; epicsInt32 udpQueueHighWaterMark = 0; epicsInt32 sortedQueueHighWaterMark = 0; while (true) { - queuedEvents = eventsInQueue(this->sortedQueue); // in case we can't wait + queuedEvents = + eventsInQueue(this->sortedQueue); // in case we can't wait lastProcess = epicsTime::getCurrent(); currentTime = lastProcess; - // wait for mininmum packet frequency or enough packets to ensure we could potentially - // have at least 1 packet per mcpdid - while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) { + // wait for mininmum packet frequency or enough packets to ensure we + // could potentially have at least 1 packet per mcpdid + while (queuedEvents < bufferedEvents && + epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); queuedEvents = eventsInQueue(this->sortedQueue); } + getIntegerParam(this->P_Status, &currStatus); + queuedEvents = std::min(queuedEvents, bufferedEvents); NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents; @@ -614,274 +628,71 @@ void asynStreamGeneratorDriver::processEvents() { // new read, in the case that all new events are newer timewise, and // therefore, all events from eventsB have to be placed in a preceeding // position. - epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent)); + epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, + queuedEvents * sizeof(NormalisedEvent)); - int toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; + int toProcess = + eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; // TODO could also consider an in-place merge - eventsBLastEnd = std::merge( - newStartPtr, newStartPtr + queuedEvents, - eventsBLastStart, eventsBLastEnd, - eventsA, reverseSortEventsByTime - ); + eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents, + eventsBLastStart, eventsBLastEnd, eventsA, + oldestEventsFirst); eventsBLastStart = eventsA + toProcess; - for (size_t i = 0; i < toProcess; ++i) { - counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += 1; + // TODO I haven't really taken care of the case that there are no events + + if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) { + + getIntegerParam(this->P_CountPreset, &countPreset); + getIntegerParam(this->P_TimePreset, &timePreset); + getIntegerParam(this->P_MonitorChannel, &presetChannel); + + // reset status variables + startTimestamp = eventsA[0].timestamp; + elapsedSeconds = 0; + for (size_t i = 0; i < this->num_channels; ++i) { + counts[i] = 0; + } } - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); + if (currStatus == STATUS_COUNTING) { + + // The elapsedSeconds are round differently depending on whether we + // are using them for comparison, or for showing to the user, to + // try and make sure the data we send to kafka is correct, while + // the measurement time also appears intuitive. + for (size_t i = 0; i < toProcess; ++i) { + counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += + 1; + elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; + + if ((countPreset && counts[presetChannel] >= countPreset) || + (timePreset && elapsedSeconds > (double)timePreset)) + break; + + // TODO also batchwise? + this->queueForKafka(eventsA[i]); + } + + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); + } + setIntegerParam(P_ElapsedTime, (epicsInt32)elapsedSeconds); + + if ((countPreset && counts[presetChannel] >= countPreset) || + (timePreset && elapsedSeconds > (double)timePreset)) { + setIntegerParam(this->P_Status, STATUS_IDLE); + setIntegerParam(this->P_CountPreset, 0); + setIntegerParam(this->P_TimePreset, 0); + } } - //setIntegerParam(P_ElapsedTime, elapsedSeconds); + prevStatus = currStatus; std::swap(eventsA, eventsB); - } - - // // TODO this is totally decoupled!!! - // const size_t queueBufferSize = - // 10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent); - - // //struct { - // // bool operator()(const NormalisedEvent l, - // // const NormalisedEvent r) const { - // // return l.timestamp > r.timestamp; - // // } - // //} smallestToLargest; - - // //// This should never be used. It is just instantiated to reserve a buffer - // //// of specific size. - // //std::vector queueBuffer; - // //queueBuffer.reserve(queueBufferSize); - - // //std::priority_queue, - // // decltype(smallestToLargest)> - // // timeQueue(smallestToLargest, std::move(queueBuffer)); - - // NormalisedEvent* timeQueue = new NormalisedEvent[queueBufferSize]; - - // // TODO epics doesn't seem to support uint64, you would need an array of - // // uint32. It does support int64 though.. so we start with that - // epicsInt32 *counts = new epicsInt32[this->num_channels]; - - // const uint64_t minRateSamplePeriod = 100'000'000ll; - // const size_t rateAverageWindow = 20; - // size_t countDiffsPtr = 0; - // epicsInt32 *rates = new epicsInt32[this->num_channels]; - // epicsInt32 *countDiff = new epicsInt32[this->num_channels]; - // epicsInt32 *countDiffs = - // new epicsInt32[this->num_channels * rateAverageWindow]; - // uint64_t *timeSpans = new uint64_t[this->num_channels]; - // epicsTimeStamp lastRateUpdate = epicsTime::getCurrent(); - - // asynStatus status = asynSuccess; - // NormalisedEvent ne; - // uint64_t newestTimestamp = 0; - // uint64_t startTimestamp = std::numeric_limits::max(); - // uint64_t currTimestamp; - // epicsInt32 elapsedSeconds = 0; - // epicsInt32 prevStatus = STATUS_IDLE; - // epicsInt32 currStatus = STATUS_IDLE; - // epicsInt32 countPreset = 0; - // epicsInt32 timePreset = 0; - // epicsInt32 presetChannel = 0; - // epicsInt32 udpQueueHighWaterMark = 0; - - // while (true) { - - // // I think mostly everything should already by sorted - // // could probably in the other thread guarantee that each packet is sorted - // // but probably it already is... - // // - // // so really we just need to merge sort chunks - - // // idea is to try and guarantee at least 1 packet per id or the min - // // frequency for each id without actually checking all ids - // // size_t timeQueuePtr = 0; - // // while (timeQueuePtr < 1500 * 10) { - - // // // TODO depending on how this is implemented, I may also need to - // // // check that there is is enough bytes, in case it does partial - // // // writes... - // // if (epicsRingBytesGet(udpQueue, (char *)&ne, - // // sizeof(NormalisedEvent))) { - // // // we should restart this ioc at least every few years, as at ns - // // // resolution with a uint64_t we will have an overflow after - // // // around 4 years - // // newestTimestamp = std::max(newestTimestamp, ne.timestamp); - - // // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0]; - - // // timeQueue.push(std::move(ne)); - // // } - - // // } - - - // // while (timeQueue.empty() || - // // (timeQueue.size() < 1500 * 10 && - // // newestTimestamp - timeQueue.top().timestamp < 200'000'000ull)) { - - // // // TODO depending on how this is implemented, I may also need to - // // // check that there is is enough bytes, in case it does partial - // // // writes... - // // if (epicsRingBytesGet(udpQueue, (char *)&ne, - // // sizeof(NormalisedEvent))) { - // // // we should restart this ioc at least every few years, as at ns - // // // resolution with a uint64_t we will have an overflow after - // // // around 4 years - // // newestTimestamp = std::max(newestTimestamp, ne.timestamp); - - // // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0]; - - // // timeQueue.push(std::move(ne)); - // // } - // // } - - // // ne = timeQueue.top(); - // // timeQueue.pop(); - - // // status = getIntegerParam(this->P_Status, &currStatus); - - // // udpQueueHighWaterMark = - // // epicsRingBytesHighWaterMark(udpQueue) / sizeof(NormalisedEvent); - - // // // if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) { - // // // // Starting a new count - - // // // // get current count configuration - // // // getIntegerParam(this->P_CountPreset, &countPreset); - // // // getIntegerParam(this->P_TimePreset, &timePreset); - // // // getIntegerParam(this->P_MonitorChannel, &presetChannel); - - // // // // reset status variables - // // // startTimestamp = std::numeric_limits::max(); - // // // for (size_t i = 0; i < this->num_channels; ++i) { - // // // counts[i] = 0; - // // // } - - // // // // reset pvs - // // // // lock(); - // // // // for (size_t i = 0; i < num_channels; ++i) { - // // // // setIntegerParam(P_Counts[i], counts[i]); - // // // // } - // // // // setIntegerParam(P_ElapsedTime, 0); - // // // // callParamCallbacks(); - // // // // unlock(); - - // // // // TODO might consider throwing out current buffer as it is - // // // // from before count started? then again, 0.2 ms or whatever is - // // // // set above is quite a small preceeding amount of time, so - // // // // maybe it doesn't matter - // // // } - - // // // prevStatus = currStatus; - - // // //if (currStatus == STATUS_COUNTING) { - // // startTimestamp = std::min(startTimestamp, ne.timestamp); - // // currTimestamp = ne.timestamp; - // // elapsedSeconds = - // // 0 ? currTimestamp <= startTimestamp - // // : ((double)(currTimestamp - startTimestamp)) / 1e9; - - // // // is our count finished? - // // // if ((countPreset && counts[presetChannel] >= countPreset) || - // // // (timePreset && elapsedSeconds >= timePreset)) { - - // // // // filter out events that occured after the specified time - // // // if (ne.timestamp - startTimestamp <= countPreset) { - // // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; - // // // this->queueForKafka(std::move(ne)); - - // // // // add any remaining events with the same timestamp - // // // // we could theoretically have a small overrun if the - // // // // timestamps are identical on the monitor channel - // // // while (!timeQueue.empty() && - // // // !timeQueue.top().timestamp == currTimestamp) { - // // // ne = timeQueue.top(); - // // // timeQueue.pop(); - // // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; - // // // this->queueForKafka(std::move(ne)); - // // // } - // // // } - - // // // countPreset = 0; - // // // timePreset = 0; - - // // // // lock(); - // // // for (size_t i = 0; i < num_channels; ++i) { - // // // setIntegerParam(P_Counts[i], counts[i]); - // // // } - // // // setIntegerParam(P_ElapsedTime, elapsedSeconds); - // // // setIntegerParam(P_CountPreset, countPreset); - // // // setIntegerParam(P_TimePreset, timePreset); - // // // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark); - // // // // callParamCallbacks(); - // // // setIntegerParam(P_Status, STATUS_IDLE); - // // // // callParamCallbacks(); - // // // // unlock(); - - // // // epicsRingBytesResetHighWaterMark(udpQueue); - - // // // } else { - - // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; - // // this->queueForKafka(std::move(ne)); - - // // // lock(); - // // for (size_t i = 0; i < num_channels; ++i) { - // // setIntegerParam(P_Counts[i], counts[i]); - // // } - // // setIntegerParam(P_ElapsedTime, elapsedSeconds); - // // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark); - // // // callParamCallbacks(); - // // // unlock(); - // // // } - // // //} - - // // // Careful changing any of these magic numbers until I clean this up - // // // as you might end up calculating the wrong rate - // // // epicsTimeStamp currentTime = epicsTime::getCurrent(); - // // // if (epicsTimeDiffInNS(¤tTime, &lastRateUpdate) > - // // // minRateSamplePeriod) { - // // // timeSpans[countDiffsPtr] = - // // // epicsTimeDiffInNS(¤tTime, &lastRateUpdate); - - // // // uint64_t totalTime = 0; - // // // for (size_t i = 0; i <= rateAverageWindow; ++i) { - // // // totalTime += timeSpans[i]; - // // // } - - // // // lastRateUpdate = currentTime; - - // // // for (size_t i = 0; i <= this->num_channels; ++i) { - // // // countDiffs[i * rateAverageWindow + countDiffsPtr] = - // // // countDiff[i]; - - // // // uint64_t cnt = 0; - // // // for (size_t j = 0; j <= rateAverageWindow; ++j) { - // // // cnt += countDiffs[i * rateAverageWindow + j]; - // // // } - // // // rates[i] = cnt / (totalTime * 1e-9); - - // // // countDiff[i] = 0; - // // // } - - // // // countDiffsPtr = (countDiffsPtr + 1) % rateAverageWindow; - - // // // if (countDiffsPtr % 5 == 0) { - // // // // lock(); - // // // for (size_t i = 0; i < num_channels; ++i) { - // // // setIntegerParam(P_Rates[i], rates[i]); - // // // } - // // // // callParamCallbacks(); - // // // // unlock(); - // // // } - // // // } - // } } void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue, @@ -922,7 +733,7 @@ void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue, // At least every 0.2 seconds if (total >= this->kafkaMaxPacketSize || - epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { + epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) { last_sent = epicsTime::getCurrent(); if (total) { diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index fa5358d..f4d61b8 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -166,7 +166,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { asynStatus createInt32Param(asynStatus status, char *name, int *variable, epicsInt32 initialValue = 0); - inline void queueForKafka(NormalisedEvent &&ne); + inline void queueForKafka(NormalisedEvent &ne); void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer, const char *topic, const char *source);