diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 4ecaaba..638ea69 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -499,7 +499,7 @@ void asynStreamGeneratorDriver::receiveUDP() { const std::size_t minCmdBufferHeaderLength = 20; if (received >= minDataBufferHeaderLength && - received == bufferLength * 2 && !isCmdBuffer) { + received == bufferLength * 2) { epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize); @@ -681,7 +681,7 @@ inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { void asynStreamGeneratorDriver::processEvents() { - // const char functionName[]{"processEvents"}; + const char functionName[]{"processEvents"}; // x * number of ids * max events in packet * event size int bufferedEvents = 5 * 10 * 243; @@ -776,19 +776,38 @@ void asynStreamGeneratorDriver::processEvents() { for (std::size_t i = 0; i < this->num_channels; ++i) { counts[i] = 0; } + + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: starting count: (%d, %d, %d, %" PRIu64 ")\n", + driverName, functionName, countPreset, timePreset, + presetChannel, startTimestamp); } if (currStatus == STATUS_COUNTING) { for (std::size_t i = 0; i < toProcess; ++i) { + counts[eventsA[i].source == 0 ? eventsA[i].pixelId : this->num_channels - 1] += 1; elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; - // TODO should really check there an no more events with the - // same final timestamp - if ((countPreset && counts[presetChannel] >= countPreset) || - (timePreset && elapsedSeconds > (double)timePreset)) { + const bool reachedCount = + countPreset && counts[presetChannel] >= countPreset; + const bool reachedTime = + timePreset && elapsedSeconds > (double)timePreset; + + if (reachedCount || reachedTime) { + + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: reached preset: (%lld, %f)\n", driverName, + functionName, counts[presetChannel], + elapsedSeconds); + + // TODO should really check there an no more events with the + // same final timestamp + if (counts[presetChannel] == countPreset) + this->queueForKafka(eventsA[i]); + elapsedSeconds = timePreset == 0 ? elapsedSeconds : timePreset; @@ -805,7 +824,9 @@ void asynStreamGeneratorDriver::processEvents() { setDoubleParam(P_ElapsedTime, elapsedSeconds); if ((countPreset && counts[presetChannel] >= countPreset) || - (timePreset && elapsedSeconds > (double)timePreset)) { + (timePreset && elapsedSeconds >= (double)timePreset)) { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: finished count\n", driverName, functionName); setIntegerParam(this->P_Status, STATUS_IDLE); setIntegerParam(this->P_CountPreset, 0); setIntegerParam(this->P_TimePreset, 0);