diff --git a/scripts/st.cmd b/scripts/st.cmd index 3282bb4..b5e6cc0 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -9,7 +9,7 @@ epicsEnvSet("INSTR", "SQ:TEST:") epicsEnvSet("NAME", "SG") drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1) -asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, 1000, 8192) +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, 0, 1000, 8192) dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5") diff --git a/scripts/udp_gen.py b/scripts/udp_gen.py index 297e31b..bb62832 100644 --- a/scripts/udp_gen.py +++ b/scripts/udp_gen.py @@ -106,4 +106,4 @@ while True: sock.sendto(bytes(tosend), ('127.0.0.1', 54321)) mv = memoryview(bytes(header)).cast('H') print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}') - # time.sleep(0.5) + # time.sleep(1) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 2864e57..508540a 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -88,8 +88,8 @@ asynStatus asynStreamGeneratorDriver::createInt32Param( */ asynStreamGeneratorDriver::asynStreamGeneratorDriver( const char *portName, const char *ipPortName, const int numChannels, - const int udpQueueSize, const int kafkaQueueSize, - const int kafkaMaxPacketSize) + const int udpQueueSize, const bool enableKafkaStream, + const int kafkaQueueSize, const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ @@ -151,35 +151,36 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( // Create Events this->pausedEventId = epicsEventCreate(epicsEventEmpty); - // TODO re-enable the kafka stuff - // this->monitorProducer = create_kafka_producer(); - // this->detectorProducer = create_kafka_producer(); + if (enableKafkaStream) { + this->monitorProducer = create_kafka_producer(); + this->detectorProducer = create_kafka_producer(); - // // Setup for Thread Producing Monitor Kafka Events - // status = - // (asynStatus)(epicsThreadCreate( - // "monitor_produce", epicsThreadPriorityMedium, - // epicsThreadGetStackSize(epicsThreadStackMedium), - // (EPICSTHREADFUNC)::monitorProducerTask, this) == - // NULL); - // if (status) { - // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - // functionName, status); - // exit(1); - // } + // Setup for Thread Producing Monitor Kafka Events + status = + (asynStatus)(epicsThreadCreate( + "monitor_produce", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::monitorProducerTask, + this) == NULL); + if (status) { + printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + functionName, status); + exit(1); + } - // // Setup for Thread Producing Detector Kafka Events - // status = (asynStatus)(epicsThreadCreate( - // "monitor_produce", epicsThreadPriorityMedium, - // epicsThreadGetStackSize(epicsThreadStackMedium), - // (EPICSTHREADFUNC)::detectorProducerTask, - // this) == NULL); - // if (status) { - // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - // functionName, status); - // exit(1); - // } - // TODO re-enable the kafka stuff + // Setup for Thread Producing Detector Kafka Events + status = + (asynStatus)(epicsThreadCreate( + "monitor_produce", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::detectorProducerTask, + this) == NULL); + if (status) { + printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + functionName, status); + exit(1); + } + } /* Create the thread that orders the events and acts as our sinqDaq stand-in */ @@ -281,6 +282,9 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, void asynStreamGeneratorDriver::receiveUDP() { + // TODO fix time overflows + // TODO check for lost packets + const char *functionName = "receiveUDP"; asynStatus status = asynSuccess; int isConnected = 1; @@ -370,19 +374,29 @@ void asynStreamGeneratorDriver::processEvents() { decltype(smallestToLargest)> timeQueue(smallestToLargest, std::move(queueBuffer)); - NormalisedEvent *ne; - - uint64_t newest = 0; - // TODO epics doesn't seem to support uint64, you would need an array of // uint32. It does support int64 though.. so we start with that epicsInt32 *counts = new epicsInt32[this->num_channels]; + asynStatus status = asynSuccess; + NormalisedEvent *ne; + uint64_t newestTimestamp = 0; + uint64_t startTimestamp = std::numeric_limits::max(); + uint64_t currTimestamp; + epicsInt32 elapsedSeconds = 0; + epicsInt32 prevStatus = STATUS_IDLE; + epicsInt32 currStatus = STATUS_IDLE; + epicsInt32 countPreset = 0; + epicsInt32 timePreset = 0; + epicsInt32 presetChannel = 0; + while (true) { if ((ne = this->udpQueue.pop()) != nullptr) { - // TODO overflow in the correlation unit? - newest = std::max(newest, ne->timestamp); + // we should reastart this ioc at least every few years, as at ns + // resolution with a uint64_t we will have an overflow after around + // 4 years + newestTimestamp = std::max(newestTimestamp, ne->timestamp); timeQueue.push(ne); } @@ -390,22 +404,93 @@ void asynStreamGeneratorDriver::processEvents() { // frequency for each id without actually checking all ids if (timeQueue.size() >= 1500 * 10 || (timeQueue.size() > 0 && - newest - timeQueue.top()->timestamp >= 200'000'000ull)) { + newestTimestamp - timeQueue.top()->timestamp >= 200'000'000ull)) { ne = timeQueue.top(); timeQueue.pop(); - counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; + status = getIntegerParam(this->P_Status, &currStatus); + + if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) { + // Starting a new count + + // get current count configuration + getIntegerParam(this->P_CountPreset, &countPreset); + getIntegerParam(this->P_TimePreset, &timePreset); + getIntegerParam(this->P_MonitorChannel, &presetChannel); + + // reset status variables + startTimestamp = std::numeric_limits::max(); + for (size_t i = 0; i < this->num_channels; ++i) { + counts[i] = 0; + } + + // reset pvs + lock(); + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); + } + setIntegerParam(P_ElapsedTime, 0); + callParamCallbacks(); + unlock(); + + // TODO might consider throwing out current buffer as it is + // from before count started? then again, 0.2 ms or whatever is + // set above is quite a small preceeding amount of time, so + // maybe it doesn't matter + } + + prevStatus = currStatus; + + if (currStatus == STATUS_COUNTING) { + startTimestamp = std::min(startTimestamp, ne->timestamp); + counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; + currTimestamp = ne->timestamp; + elapsedSeconds = + 0 ? currTimestamp <= startTimestamp + : ((double)(currTimestamp - startTimestamp)) / 1e9; + } delete ne; - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); + // is our count finished? + if ((countPreset && counts[presetChannel] >= countPreset) || + (timePreset && elapsedSeconds >= timePreset)) { + + // add any remaining events with the same timestamp + // we could theoretically have a small overrun if the + // timestamps are identical on the monitor channel + while (!timeQueue.empty() && + !timeQueue.top()->timestamp == currTimestamp) { + ne = timeQueue.top(); + timeQueue.pop(); + counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; + delete ne; + } + + countPreset = 0; + timePreset = 0; + + lock(); + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); + } + setIntegerParam(P_ElapsedTime, elapsedSeconds); + setIntegerParam(P_CountPreset, countPreset); + setIntegerParam(P_TimePreset, timePreset); + callParamCallbacks(); + setIntegerParam(P_Status, STATUS_IDLE); + callParamCallbacks(); + unlock(); + + } else if (currStatus == STATUS_COUNTING) { + lock(); + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); + } + setIntegerParam(P_ElapsedTime, elapsedSeconds); + callParamCallbacks(); + unlock(); } - // elapsedTime = current_time - start_time; - // setIntegerParam(P_ElapsedTime, elapsedTime); - callParamCallbacks(); - unlock(); } } } @@ -599,15 +684,13 @@ void asynStreamGeneratorDriver::produceDetector() { */ extern "C" { -asynStatus asynStreamGeneratorDriverConfigure(const char *portName, - const char *ipPortName, - const int numChannels, - const int udpQueueSize, - const int kafkaQueueSize, - const int kafkaMaxPacketSize) { +asynStatus asynStreamGeneratorDriverConfigure( + const char *portName, const char *ipPortName, const int numChannels, + const int udpQueueSize, const bool enableKafkaStream, + const int kafkaQueueSize, const int kafkaMaxPacketSize) { new asynStreamGeneratorDriver(portName, ipPortName, numChannels, - udpQueueSize, kafkaQueueSize, - kafkaMaxPacketSize); + udpQueueSize, enableKafkaStream, + kafkaQueueSize, kafkaMaxPacketSize); return asynSuccess; } @@ -615,15 +698,17 @@ static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt}; -static const iocshArg initArg4 = {"kafkaQueueSize", iocshArgInt}; -static const iocshArg initArg5 = {"kafkaMaxPacketSize", iocshArgInt}; +static const iocshArg initArg4 = {"enableKafkaStream", iocshArgInt}; +static const iocshArg initArg5 = {"kafkaQueueSize", iocshArgInt}; +static const iocshArg initArg6 = {"kafkaMaxPacketSize", iocshArgInt}; static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, - &initArg3, &initArg4, &initArg5}; + &initArg3, &initArg4, &initArg5, + &initArg6}; static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 6, initArgs}; static void initCallFunc(const iocshArgBuf *args) { asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival, - args[3].ival, args[4].ival, - args[5].ival); + args[3].ival, args[4].ival, args[5].ival, + args[6].ival); } void asynStreamGeneratorDriverRegister(void) { diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 6f8da61..cc965ff 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -111,6 +111,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { public: asynStreamGeneratorDriver(const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, + const bool enableKafkaStream, const int kafkaQueueSize, const int kafkaMaxPacketSize); virtual ~asynStreamGeneratorDriver();