diff --git a/scripts/st.cmd b/scripts/st.cmd index 463962b..3282bb4 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -8,8 +8,8 @@ require StreamGenerator, test epicsEnvSet("INSTR", "SQ:TEST:") epicsEnvSet("NAME", "SG") -drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 0) -asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 1000, 8192) +drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1) +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, 1000, 8192) dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5") diff --git a/scripts/udp_gen.py b/scripts/udp_gen.py index fcd19fd..297e31b 100644 --- a/scripts/udp_gen.py +++ b/scripts/udp_gen.py @@ -45,16 +45,15 @@ while True: header[16] = t_high & 0xff header[17] = t_high >> 8 - # num_events = random.randint(0, 243) - num_events = 243 + num_events = random.randint(0, 243) + # num_events = 243 + # num_events = 1 # update buffer length buffer_length = 21 + num_events * 3 header[0] = buffer_length & 0xff header[1] = (buffer_length >> 8) & 0xff - tosend = list(header) - # I believe, that in our case we never mix monitor and detector events as # the monitors should have id 0 and the detector events 1-9 so I have # excluded that posibility here. That would, however, if true mean we could @@ -62,6 +61,10 @@ while True: is_monitor = random.randint(0, 9) + header[11] = 0 if is_monitor > 3 else random.randint(1,9) + + tosend = list(header) + if is_monitor > 3: for i in range(num_events): @@ -103,4 +106,4 @@ while True: sock.sendto(bytes(tosend), ('127.0.0.1', 54321)) mv = memoryview(bytes(header)).cast('H') print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}') - # time.sleep(0.0005) + # time.sleep(0.5) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index c2309c1..2864e57 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -57,6 +57,11 @@ static void udpPollerTask(void *drvPvt) { pSGD->receiveUDP(); } +static void daqTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->processEvents(); +} + static void monitorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceMonitor(); @@ -83,7 +88,8 @@ asynStatus asynStreamGeneratorDriver::createInt32Param( */ asynStreamGeneratorDriver::asynStreamGeneratorDriver( const char *portName, const char *ipPortName, const int numChannels, - const int kafkaQueueSize, const int kafkaMaxPacketSize) + const int udpQueueSize, const int kafkaQueueSize, + const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ @@ -95,8 +101,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ - num_channels(numChannels + 1), monitorQueue(kafkaQueueSize, false), - detectorQueue(kafkaQueueSize, false), + num_channels(numChannels + 1), udpQueue(udpQueueSize, false), + monitorQueue(kafkaQueueSize, false), detectorQueue(kafkaQueueSize, false), kafkaMaxPacketSize(kafkaMaxPacketSize) { const char *functionName = "asynStreamGeneratorDriver"; @@ -122,7 +128,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( P_Counts = new int[this->num_channels]; P_Rates = new int[this->num_channels]; P_ClearCounts = new int[this->num_channels]; - for (size_t i = 0; i < this->num_channels; ++i) { + for (std::size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); status = createInt32Param(status, pv_name_buffer, P_Counts + i); @@ -145,27 +151,42 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( // Create Events this->pausedEventId = epicsEventCreate(epicsEventEmpty); - this->monitorProducer = create_kafka_producer(); - this->detectorProducer = create_kafka_producer(); + // TODO re-enable the kafka stuff + // this->monitorProducer = create_kafka_producer(); + // this->detectorProducer = create_kafka_producer(); - // Setup for Thread Producing Monitor Kafka Events - status = - (asynStatus)(epicsThreadCreate( - "monitor_produce", epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackMedium), - (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); - if (status) { - printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - functionName, status); - exit(1); - } + // // Setup for Thread Producing Monitor Kafka Events + // status = + // (asynStatus)(epicsThreadCreate( + // "monitor_produce", epicsThreadPriorityMedium, + // epicsThreadGetStackSize(epicsThreadStackMedium), + // (EPICSTHREADFUNC)::monitorProducerTask, this) == + // NULL); + // if (status) { + // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + // functionName, status); + // exit(1); + // } - // Setup for Thread Producing Detector Kafka Events + // // Setup for Thread Producing Detector Kafka Events + // status = (asynStatus)(epicsThreadCreate( + // "monitor_produce", epicsThreadPriorityMedium, + // epicsThreadGetStackSize(epicsThreadStackMedium), + // (EPICSTHREADFUNC)::detectorProducerTask, + // this) == NULL); + // if (status) { + // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + // functionName, status); + // exit(1); + // } + // TODO re-enable the kafka stuff + + /* Create the thread that orders the events and acts as our sinqDaq stand-in + */ status = (asynStatus)(epicsThreadCreate( - "monitor_produce", epicsThreadPriorityMedium, + "sinqDAQ", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), - (EPICSTHREADFUNC)::detectorProducerTask, - this) == NULL); + (EPICSTHREADFUNC)::daqTask, this) == NULL); if (status) { printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); @@ -258,165 +279,134 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, return status; } -// TODO probably I will have to split this function up, so that the system -// can process the UDP messages in parallel void asynStreamGeneratorDriver::receiveUDP() { - asynStatus status; - int isConnected; - - const size_t buffer_size = 1500; - char buffer[buffer_size]; - size_t received; - int eomReason; - - epicsInt32 val; - epicsInt32 currentStatus; - epicsInt32 countPreset = 0; - epicsInt32 timePreset = 0; - epicsInt32 presetChannel = 0; const char *functionName = "receiveUDP"; + asynStatus status = asynSuccess; + int isConnected = 1; + std::size_t received; + int eomReason; - // TODO epics doesn't seem to support uint64, you would need an array of - // uint32. It does support int64 though.. so we start with that - epicsInt32 *counts = new epicsInt32[this->num_channels]; - - uint64_t start_time = std::numeric_limits::max(); - uint64_t current_time = 0; - epicsInt32 elapsedTime = 0; + // The correlation unit sents messages with a maximum size of 1500 bytes. + // These messages don't have any obious start or end to synchronise + // against... + const std::size_t bufferSize = 1500; + char buffer[bufferSize + 1]; // so that \0 can fit while (true) { - status = getIntegerParam(this->P_Status, ¤tStatus); - if (!currentStatus || status) { - - epicsEventWait(this->pausedEventId); - - getIntegerParam(this->P_CountPreset, &countPreset); - getIntegerParam(this->P_TimePreset, &timePreset); - getIntegerParam(this->P_MonitorChannel, &presetChannel); - - // memset doesn't work with epicsInt32 - for (size_t i = 0; i < this->num_channels; ++i) { - counts[i] = 0; - } - - start_time = std::numeric_limits::max(); - current_time = 0; - elapsedTime = 0; - - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); - } - setIntegerParam(P_ElapsedTime, 0); - callParamCallbacks(); - unlock(); - - // Clear the input buffer, in case of stray messages - pasynOctetSyncIO->flush(pasynUDPUser); - } - status = pasynManager->isConnected(pasynUDPUser, &isConnected); + if (!isConnected) asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: isConnected = %d\n", driverName, functionName, isConnected); - status = pasynOctetSyncIO->read(pasynUDPUser, buffer, buffer_size, + status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, 0, // timeout &received, &eomReason); if (received) { + UDPHeader *header = (UDPHeader *)buffer; - size_t total_events = (header->BufferLength - 21) / 3; + std::size_t total_events = (header->BufferLength - 21) / 3; - start_time = - std::min(start_time, (uint64_t)(header->nanosecs() / 1e9)); - // This is maybe safer, in case the time wraps back around? - // if (start_time == std::numeric_limits::max()) - // start_time = header->nanosecs() /1e9; - - // TODO lots of checks and validation missing everywhere here if (received == total_events * 6 + 42) { - // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - // "%s:%s: received packet %d with %d events (%" - // PRIu64 - // ")\n", - // driverName, functionName, - // header->BufferNumber, total_events, - // header->nanosecs()); - for (size_t i = 0; i < total_events; ++i) { + for (std::size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); - if (countPreset && counts[presetChannel] >= countPreset) - break; + NormalisedEvent *ne; if (event[5] & 0x80) { // Monitor Event MonitorEvent *m_event = (MonitorEvent *)event; - counts[m_event->DataID + 1] += 1; - // needs to be freed!!! - auto nme = new NormalisedMonitorEvent(); - nme->TimeStamp = - header->nanosecs() + (uint64_t)m_event->nanosecs(); - nme->DataID = m_event->DataID; - this->monitorQueue.push(nme); - - current_time = std::max( - current_time, - (uint64_t)((header->nanosecs() + - (uint64_t)m_event->nanosecs()) / - 1e9)); + ne = new NormalisedEvent( + header->nanosecs() + (uint64_t)m_event->nanosecs(), + 0, m_event->DataID); } else { // Detector Event DetectorEvent *d_event = (DetectorEvent *)event; - counts[0] += 1; // needs to be freed!!! - auto nde = new NormalisedDetectorEvent(); - nde->TimeStamp = - header->nanosecs() + (uint64_t)d_event->nanosecs(); - nde->PixID = d_event->pixelId(header->McpdID); - this->detectorQueue.push(nde); - - current_time = std::max( - current_time, - (uint64_t)((header->nanosecs() + - (uint64_t)d_event->nanosecs()) / - 1e9)); + ne = new NormalisedEvent( + header->nanosecs() + (uint64_t)d_event->nanosecs(), + header->McpdID, d_event->pixelId(header->McpdID)); } + + this->udpQueue.push(ne); } - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); - } - elapsedTime = current_time - start_time; - setIntegerParam(P_ElapsedTime, elapsedTime); - callParamCallbacks(); - unlock(); } else { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: invalid UDP packet\n", driverName, functionName); } + } + } +} - if ((countPreset && counts[presetChannel] >= countPreset) || - (timePreset && elapsedTime >= timePreset)) { - lock(); - setIntegerParam(P_Status, STATUS_IDLE); - setIntegerParam(P_CountPreset, 0); - setIntegerParam(P_TimePreset, 0); - callParamCallbacks(); - unlock(); - } +void asynStreamGeneratorDriver::processEvents() { + + const char *functionName = "processEvents"; + + const size_t queueBufferSize = 10 * this->udpQueue.getSize(); + + struct { + bool operator()(const NormalisedEvent *l, + const NormalisedEvent *r) const { + return l->timestamp > r->timestamp; + } + } smallestToLargest; + + // This should never be used. It is just instantiated to reserve a buffer + // of specific size. + std::vector queueBuffer; + queueBuffer.reserve(queueBufferSize); + + std::priority_queue, + decltype(smallestToLargest)> + timeQueue(smallestToLargest, std::move(queueBuffer)); + + NormalisedEvent *ne; + + uint64_t newest = 0; + + // TODO epics doesn't seem to support uint64, you would need an array of + // uint32. It does support int64 though.. so we start with that + epicsInt32 *counts = new epicsInt32[this->num_channels]; + + while (true) { + + if ((ne = this->udpQueue.pop()) != nullptr) { + // TODO overflow in the correlation unit? + newest = std::max(newest, ne->timestamp); + timeQueue.push(ne); } - // epicsThreadSleep(1); // seconds + // idea is to try and guarantee at least 1 packet per id or the min + // frequency for each id without actually checking all ids + if (timeQueue.size() >= 1500 * 10 || + (timeQueue.size() > 0 && + newest - timeQueue.top()->timestamp >= 200'000'000ull)) { + ne = timeQueue.top(); + timeQueue.pop(); + + counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; + + delete ne; + + lock(); + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); + } + // elapsedTime = current_time - start_time; + // setIntegerParam(P_ElapsedTime, elapsedTime); + callParamCallbacks(); + unlock(); + } } } @@ -497,7 +487,7 @@ void asynStreamGeneratorDriver::produceMonitor() { void asynStreamGeneratorDriver::produceDetector() { - static const size_t bufferSize = this->kafkaMaxPacketSize + 16; + static const std::size_t bufferSize = this->kafkaMaxPacketSize + 16; flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; @@ -612,24 +602,28 @@ extern "C" { asynStatus asynStreamGeneratorDriverConfigure(const char *portName, const char *ipPortName, const int numChannels, + const int udpQueueSize, const int kafkaQueueSize, const int kafkaMaxPacketSize) { new asynStreamGeneratorDriver(portName, ipPortName, numChannels, - kafkaQueueSize, kafkaMaxPacketSize); + udpQueueSize, kafkaQueueSize, + kafkaMaxPacketSize); return asynSuccess; } static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; -static const iocshArg initArg3 = {"kafkaQueueSize", iocshArgInt}; -static const iocshArg initArg4 = {"kafkaMaxPacketSize", iocshArgInt}; +static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt}; +static const iocshArg initArg4 = {"kafkaQueueSize", iocshArgInt}; +static const iocshArg initArg5 = {"kafkaMaxPacketSize", iocshArgInt}; static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, - &initArg3, &initArg4}; -static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 5, initArgs}; + &initArg3, &initArg4, &initArg5}; +static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 6, initArgs}; static void initCallFunc(const iocshArgBuf *args) { asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival, - args[3].ival, args[4].ival); + args[3].ival, args[4].ival, + args[5].ival); } void asynStreamGeneratorDriverRegister(void) { diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index d3b6002..6f8da61 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -66,6 +66,15 @@ struct __attribute__((__packed__)) NormalisedDetectorEvent { uint32_t PixID; }; +struct __attribute__((__packed__)) NormalisedEvent { + uint64_t timestamp; + uint8_t source; + uint32_t pixelId; + + inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t pixelId) + : timestamp(timestamp), source(source), pixelId(pixelId){}; +}; + /******************************************************************************* * Status values that should match the definition in db/daq_common.db */ @@ -101,13 +110,15 @@ struct __attribute__((__packed__)) NormalisedDetectorEvent { class asynStreamGeneratorDriver : public asynPortDriver { public: asynStreamGeneratorDriver(const char *portName, const char *ipPortName, - const int numChannels, const int kafkaQueueSize, + const int numChannels, const int udpQueueSize, + const int kafkaQueueSize, const int kafkaMaxPacketSize); virtual ~asynStreamGeneratorDriver(); virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value); void receiveUDP(); + void processEvents(); void produceMonitor(); void produceDetector(); @@ -134,6 +145,8 @@ class asynStreamGeneratorDriver : public asynPortDriver { const int num_channels; const int kafkaMaxPacketSize; + epicsRingPointer udpQueue; + epicsRingPointer monitorQueue; rd_kafka_t *monitorProducer;