From d7bf3977fc8ed34ca23444c5df36b0d076b34f75 Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Fri, 31 Oct 2025 10:16:50 +0100 Subject: [PATCH] reorganises and cleans up some parts of the code --- Makefile | 2 +- scripts/udp_gen.py | 5 +- src/asynStreamGeneratorDriver.cpp | 275 ++++++++++++------------------ src/asynStreamGeneratorDriver.h | 64 ++++++- 4 files changed, 170 insertions(+), 176 deletions(-) diff --git a/Makefile b/Makefile index 6cd9a42..f17d207 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ DBDS += src/asynStreamGeneratorDriver.dbd # DB files to include in the release TEMPLATES += db/channels.db -HEADERS += src/asynStreamGeneratorDriver.h +# HEADERS += src/asynStreamGeneratorDriver.h # Source files to build SOURCES += src/asynStreamGeneratorDriver.cpp diff --git a/scripts/udp_gen.py b/scripts/udp_gen.py index fc51e17..fcd19fd 100644 --- a/scripts/udp_gen.py +++ b/scripts/udp_gen.py @@ -45,7 +45,8 @@ while True: header[16] = t_high & 0xff header[17] = t_high >> 8 - num_events = random.randint(0, 243) + # num_events = random.randint(0, 243) + num_events = 243 # update buffer length buffer_length = 21 + num_events * 3 @@ -102,4 +103,4 @@ while True: sock.sendto(bytes(tosend), ('127.0.0.1', 54321)) mv = memoryview(bytes(header)).cast('H') print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}') - time.sleep(1) + # time.sleep(0.0005) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index e8838cf..c933a75 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -11,20 +11,46 @@ #include "asynStreamGeneratorDriver.h" #include -/* Wrapper to set config values and error out if needed. +/******************************************************************************* + * Kafka Methods */ -static void set_config(rd_kafka_conf_t *conf, char *key, char *value) { + +static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key, + char *value) { char errstr[512]; rd_kafka_conf_res_t res; res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { - // TODO - // g_error("Unable to set config: %s", errstr); + epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value); exit(1); } } +static rd_kafka_t *create_kafka_producer() { + + char errstr[512]; + rd_kafka_t *producer; + + // Prepare configuration object + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + set_kafka_config_key(conf, "bootstrap.servers", "linkafka01:9092"); + set_kafka_config_key(conf, "queue.buffering.max.messages", "1e7"); + + // Create the Producer + producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + + if (!producer) { + epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr); + exit(1); + } + + return producer; +} + +/******************************************************************************* + * Static Methods Passed to Epics Threads that should run in the background + */ static void udpPollerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->receiveUDP(); @@ -40,54 +66,13 @@ static void detectorProducerTask(void *drvPvt) { pSGD->produceDetector(); } -// UDP Packet Definitions -struct __attribute__((__packed__)) UDPHeader { - uint16_t BufferLength; - uint16_t BufferType; - uint16_t HeaderLength; - uint16_t BufferNumber; - uint16_t RunCmdID; - uint16_t Status : 8; - uint16_t McpdID : 8; - uint16_t TimeStamp[3]; - uint16_t Parameter0[3]; - uint16_t Parameter1[3]; - uint16_t Parameter2[3]; - uint16_t Parameter3[3]; - - inline uint64_t nanosecs() { - uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 | - ((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]}; - return nsec * 100; - } -}; - -struct __attribute__((__packed__)) DetectorEvent { - uint64_t TimeStamp : 19; - uint16_t XPosition : 10; - uint16_t YPosition : 10; - uint16_t Amplitude : 8; - uint16_t Id : 1; - inline uint32_t nanosecs() { return TimeStamp * 100; } -}; - -struct __attribute__((__packed__)) MonitorEvent { - uint64_t TimeStamp : 19; - uint64_t Data : 21; - uint64_t DataID : 4; - uint64_t TriggerID : 3; - uint64_t Id : 1; - inline uint32_t nanosecs() { return TimeStamp * 100; } -}; - -/** Constructor for the asynStreamGeneratorDriver class. - * Calls constructor for the asynPortDriver base class. - * \param[in] portName The name of the asyn port driver to be created. */ +/******************************************************************************* + * Stream Generator Methods + */ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, const char *ipPortName, const int numChannels) : asynPortDriver(portName, 1, /* maxAddr */ - // 5, asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ asynInt32Mask | asynInt64Mask, /* Interrupt mask */ @@ -100,48 +85,23 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, 0), /* Default stack size*/ num_channels(numChannels + 1), monitorQueue(1000, false), detectorQueue(1000, false) { + const char *functionName = "asynStreamGeneratorDriver"; // Parameter Setup char pv_name_buffer[100]; P_Counts = new int[this->num_channels]; asynStatus status; + // Create PVs templated on Channel Number for (size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); status = createParam(pv_name_buffer, asynParamInt32, P_Counts + i); setIntegerParam(P_Counts[i], 0); - printf("%s %d %d %d\n", pv_name_buffer, P_Counts[i], i, status); } - char errstr[512]; - - // Create client configuration - rd_kafka_conf_t *conf = rd_kafka_conf_new(); - set_config(conf, "bootstrap.servers", "linkafka01:9092"); - set_config(conf, "queue.buffering.max.messages", "1e7"); - - // Create the Monitor Producer instance. - this->monitorProducer = - rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); - if (!this->monitorProducer) { - // TODO - // g_error("Failed to create new producer: %s", errstr); - exit(1); - } - - conf = rd_kafka_conf_new(); - set_config(conf, "bootstrap.servers", "linkafka01:9092"); - set_config(conf, "queue.buffering.max.messages", "1e7"); - - // Create the Detector Producer instance. - this->detectorProducer = - rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); - if (!this->detectorProducer) { - // TODO - // g_error("Failed to create new producer: %s", errstr); - exit(1); - } + this->monitorProducer = create_kafka_producer(); + this->detectorProducer = create_kafka_producer(); // Setup for Thread Producing Monitor Kafka Events status = @@ -150,11 +110,9 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); if (status) { - // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - // functionName, status); - printf("%s:%s: epicsThreadCreate failure, status=%d\n", - "StreamGenerator", "init", status); - return; + printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + functionName, status); + exit(1); } // Setup for Thread Producing Detector Kafka Events @@ -164,15 +122,19 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, (EPICSTHREADFUNC)::detectorProducerTask, this) == NULL); if (status) { - // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - // functionName, status); - printf("%s:%s: epicsThreadCreate failure, status=%d\n", - "StreamGenerator", "init", status); - return; + printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + functionName, status); + exit(1); } // UDP Receive Setup - pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); + status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); + + if (status) { + printf("%s:%s: Couldn't open connection %s, status=%d\n", driverName, + functionName, ipPortName, status); + exit(1); + } /* Create the thread that receives UDP traffic in the background */ status = (asynStatus)(epicsThreadCreate( @@ -180,12 +142,16 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { - // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - // functionName, status); - printf("%s:%s: epicsThreadCreate failure, status=%d\n", - "StreamGenerator", "init", status); - return; + printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + functionName, status); + exit(1); } +} + +asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { + // should make sure queues are empty and freed + // and that the kafka producers are flushed and freed + delete[] P_Counts; // TODO add exit should perhaps ensure the queue is flushed // rd_kafka_poll(producer, 0); @@ -194,50 +160,12 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } -asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { - // should make sure queues are empty and freed - // and that the kafka producers are flushed and freed - delete[] P_Counts; -} - -// // TODO pretty sure I don't actually need to overwrite this -// asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, -// epicsInt32 *value) { -// // asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser, -// // epicsInt64 *value) { -// -// const char *paramName; -// int function = pasynUser->reason; -// asynStatus status; -// -// // TODO not freed -// getParamName(function, ¶mName); -// -// bool is_p_counts = false; -// for (size_t i = 0; i < num_channels; ++i) { -// is_p_counts = is_p_counts | function == P_Counts[i]; -// } -// -// if (is_p_counts) { -// status = getIntegerParam(function, value); -// -// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: function %d %s -// %d\n", -// "StreamGenerator", "readInt64", function, paramName, -// status); -// // return status; -// return asynSuccess; -// } else { -// return asynError; -// } -// return asynSuccess; -// } - void asynStreamGeneratorDriver::receiveUDP() { asynStatus status; int isConnected; - char buffer[1500]; + const size_t buffer_size = 1500; + char buffer[buffer_size]; size_t received; int eomReason; @@ -256,22 +184,21 @@ void asynStreamGeneratorDriver::receiveUDP() { counts[i] = 0; } - // epicsStdoutPrintf("polling!!"); status = pasynManager->isConnected(pasynUDPUser, &isConnected); if (status) { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: error calling pasynManager->isConnected, " "status=%d, error=%s\n", - "StreamGenerator", "receiveUDP", status, + driverName, "receiveUDP", status, pasynUDPUser->errorMessage); // driverName, functionName, status, // pasynUserIPPort_->errorMessage); } asynPrint(pasynUserSelf, ASYN_TRACEIO_DRIVER, "%s:%s: isConnected = %d\n", // - "StreamGenerator", "receiveUDP", isConnected); + driverName, "receiveUDP", isConnected); - status = pasynOctetSyncIO->read(pasynUDPUser, buffer, 1500, + status = pasynOctetSyncIO->read(pasynUDPUser, buffer, buffer_size, 0, // timeout &received, &eomReason); @@ -279,13 +206,19 @@ void asynStreamGeneratorDriver::receiveUDP() { // asynPrint( // pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: error calling pasynOctetSyncIO->read, status=%d\n", - // "StreamGenerator", "receiveUDP", status); + // driverName, "receiveUDP", status); // buffer[received] = 0; if (received) { - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %d\n", - "StreamGenerator", "receiveUDP", received); + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %f %d + // received\n", + // driverName, "receiveUDP", (double) received / + // 1500., received); + + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received + // %d\n", + // driverName, "receiveUDP", received); UDPHeader *header = (UDPHeader *)buffer; @@ -293,11 +226,13 @@ void asynStreamGeneratorDriver::receiveUDP() { // TODO lots of checks and validation missing everywhere here if (received == total_events * 6 + 42) { - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: received packet %d with %d events (%" PRIu64 - ")\n", - "StreamGenerator", "receiveUDP", header->BufferNumber, - total_events, header->nanosecs()); + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: received packet %d with %d events (%" + // PRIu64 + // ")\n", + // driverName, "receiveUDP", + // header->BufferNumber, total_events, + // header->nanosecs()); for (size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); @@ -308,7 +243,7 @@ void asynStreamGeneratorDriver::receiveUDP() { // asynPrint( // pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: event (%03d) on monitor %d (%" PRIu64 - // ")\n", "StreamGenerator", "receiveUDP", i, + // ")\n", driverName, "receiveUDP", i, // m_event->DataID, header->nanosecs() + // (uint64_t)m_event->nanosecs()); @@ -342,11 +277,11 @@ void asynStreamGeneratorDriver::receiveUDP() { counts[i] += val; } - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: det: (%d), mon0: (%d), mon1: (%d), mon2: " - "(%d), mon3: (%d)\n", - "StreamGenerator", "receiveUDP", counts[0], counts[1], - counts[2], counts[3], counts[4]); + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: det: (%d), mon0: (%d), mon1: (%d), mon2: " + // "(%d), mon3: (%d)\n", + // driverName, "receiveUDP", counts[0], + // counts[1], counts[2], counts[3], counts[4]); lock(); for (size_t i = 0; i < num_channels; ++i) { @@ -356,12 +291,12 @@ void asynStreamGeneratorDriver::receiveUDP() { unlock(); } else { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: invalid UDP packet\n", "StreamGenerator", + "%s:%s: invalid UDP packet\n", driverName, "receiveUDP"); } } - epicsThreadSleep(1); // seconds + // epicsThreadSleep(1); // seconds } } @@ -394,6 +329,8 @@ void asynStreamGeneratorDriver::produceMonitor() { epicsThreadSleep(0.001); // seconds } + // TODO can probably just replace the current + // instead of always getting new object epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds @@ -407,7 +344,10 @@ void asynStreamGeneratorDriver::produceMonitor() { builder.Clear(); auto message = CreateEventMessageDirect( - builder, "monitor", message_id++, 0, &tof, &did); + builder, "monitor", message_id++, + ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + + ((uint64_t)now.nsec), + &tof, &did); builder.Finish(message, "ev42"); // printf("buffer size: %d\n", builder.GetSize()); @@ -432,9 +372,9 @@ void asynStreamGeneratorDriver::produceMonitor() { rd_kafka_poll(monitorProducer, 0); - printf("Monitor Events Queued before sending %d\n", - this->monitorQueue.getHighWaterMark()); - this->monitorQueue.resetHighWaterMark(); + // printf("Monitor Events Queued before sending %d\n", + // this->monitorQueue.getHighWaterMark()); + // this->monitorQueue.resetHighWaterMark(); tof.clear(); did.clear(); @@ -484,7 +424,10 @@ void asynStreamGeneratorDriver::produceDetector() { builder.Clear(); auto message = CreateEventMessageDirect( - builder, "detector", message_id++, 0, &tof, &did); + builder, "detector", message_id++, + ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + + ((uint64_t)now.nsec), + &tof, &did); builder.Finish(message, "ev42"); // printf("buffer size: %d\n", builder.GetSize()); @@ -509,9 +452,9 @@ void asynStreamGeneratorDriver::produceDetector() { rd_kafka_poll(detectorProducer, 0); - printf("Detector Events Queued before sending %d\n", - this->detectorQueue.getHighWaterMark()); - this->detectorQueue.resetHighWaterMark(); + // printf("Detector Events Queued before sending %d\n", + // this->detectorQueue.getHighWaterMark()); + // this->detectorQueue.resetHighWaterMark(); tof.clear(); did.clear(); @@ -520,13 +463,11 @@ void asynStreamGeneratorDriver::produceDetector() { } } -/* Configuration routine. Called directly, or from the iocsh function below */ - +/******************************************************************************* + * Methods exposed to IOC Shell + */ extern "C" { -/** EPICS iocsh callable function to call constructor for the - * asynStreamGeneratorDriver class. \param[in] portName The name of the asyn - * port driver to be created. */ asynStatus asynStreamGeneratorDriverConfigure(const char *portName, const char *ipPortName, const int numChannels) { @@ -534,8 +475,6 @@ asynStatus asynStreamGeneratorDriverConfigure(const char *portName, return asynSuccess; } -/* EPICS iocsh shell commands */ - static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 6ee1c6b..5aaf664 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -5,6 +5,51 @@ #include #include +/******************************************************************************* + * UDP Packet Definitions + */ +struct __attribute__((__packed__)) UDPHeader { + uint16_t BufferLength; + uint16_t BufferType; + uint16_t HeaderLength; + uint16_t BufferNumber; + uint16_t RunCmdID; + uint16_t Status : 8; + uint16_t McpdID : 8; + uint16_t TimeStamp[3]; + uint16_t Parameter0[3]; + uint16_t Parameter1[3]; + uint16_t Parameter2[3]; + uint16_t Parameter3[3]; + + inline uint64_t nanosecs() { + uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 | + ((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]}; + return nsec * 100; + } +}; + +struct __attribute__((__packed__)) DetectorEvent { + uint64_t TimeStamp : 19; + uint16_t XPosition : 10; + uint16_t YPosition : 10; + uint16_t Amplitude : 8; + uint16_t Id : 1; + inline uint32_t nanosecs() { return TimeStamp * 100; } +}; + +struct __attribute__((__packed__)) MonitorEvent { + uint64_t TimeStamp : 19; + uint64_t Data : 21; + uint64_t DataID : 4; + uint64_t TriggerID : 3; + uint64_t Id : 1; + inline uint32_t nanosecs() { return TimeStamp * 100; } +}; + +/******************************************************************************* + * Simplified Event Struct Definition + */ struct __attribute__((__packed__)) NormalisedMonitorEvent { uint64_t TimeStamp; uint8_t DataID : 4; @@ -15,27 +60,34 @@ struct __attribute__((__packed__)) NormalisedDetectorEvent { uint32_t PixID; }; -/* These are the drvInfo strings that are used to identify the parameters. */ -#define P_CountsString "COUNTS%d" /* asynInt32, r/w */ +/******************************************************************************* + * Parameters for use in DB records + * + * i.e.e drvInfo strings that are used to identify the parameters + */ +#define P_CountsString "COUNTS%d" + +/******************************************************************************* + * Stream Generator Coordinating Class + */ class asynStreamGeneratorDriver : public asynPortDriver { public: asynStreamGeneratorDriver(const char *portName, const char *ipPortName, const int numChannels); virtual ~asynStreamGeneratorDriver(); - // virtual asynStatus readInt64(asynUser *pasynUser, epicsInt64 *value); - // virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); - void receiveUDP(); void produceMonitor(); void produceDetector(); protected: + // Parameter Identifying IDs int *P_Counts; private: asynUser *pasynUDPUser; + int num_channels; epicsRingPointer monitorQueue; @@ -43,6 +95,8 @@ class asynStreamGeneratorDriver : public asynPortDriver { epicsRingPointer detectorQueue; rd_kafka_t *detectorProducer; + + constexpr static char *driverName = "StreamGenerator"; }; #endif