From 4c1741bd4b979525a422f17d9c0217c094b02db3 Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Thu, 30 Oct 2025 15:07:21 +0100 Subject: [PATCH] very inefficient, but can receive udp monitor events and count them and send them as kafka events --- src/asynStreamGeneratorDriver.cpp | 201 ++++++++++++++++++++++-------- src/asynStreamGeneratorDriver.h | 11 ++ 2 files changed, 158 insertions(+), 54 deletions(-) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index a098ad0..ecf0553 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -3,7 +3,6 @@ #include #include #include -#include // Just for printing #define __STDC_FORMAT_MACROS @@ -31,6 +30,11 @@ static void udpPollerTask(void *drvPvt) { pSGD->receiveUDP(); } +static void monitorProducerTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->produceMonitor(); +} + // UDP Packet Definitions struct __attribute__((__packed__)) UDPHeader { uint16_t BufferLength; @@ -82,16 +86,14 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ asynInt32Mask | asynInt64Mask, /* Interrupt mask */ - 0, /* asynFlags. This driver does not block and it is - not multi-device, but has a - destructor ASYN_DESTRUCTIBLE our version of the Asyn - is too old to support this flag */ - 1, /* Autoconnect */ - 0, /* Default priority */ - 0) /* Default stack size*/ -{ - this->num_channels = numChannels; - + 0, /* asynFlags. This driver does not block and it is + not multi-device, but has a + destructor ASYN_DESTRUCTIBLE our version of the Asyn + is too old to support this flag */ + 1, /* Autoconnect */ + 0, /* Default priority */ + 0), /* Default stack size*/ + num_channels(numChannels), monitorQueue(1000, false) { // Parameter Setup char pv_name_buffer[100]; P_Counts = new int[numChannels]; @@ -106,6 +108,36 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, printf("%s %d %d %d\n", pv_name_buffer, P_Counts[i], i, status); } + char errstr[512]; + + // Create client configuration + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + set_config(conf, "bootstrap.servers", "linkafka01:9092"); + set_config(conf, "queue.buffering.max.messages", "1e7"); + + // Create the Producer instance. + this->monitorProducer = + rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (!this->monitorProducer) { + // TODO + // g_error("Failed to create new producer: %s", errstr); + exit(1); + } + + // Setup for Thread Producing Monitor Kafka Events + status = + (asynStatus)(epicsThreadCreate( + "monitor_produce", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); + if (status) { + // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + // functionName, status); + printf("%s:%s: epicsThreadCreate failure, status=%d\n", + "StreamGenerator", "init", status); + return; + } + // UDP Receive Setup pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); @@ -122,61 +154,64 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, return; } - // Kafka Produce Setup - rd_kafka_conf_t *conf; - char errstr[512]; + // // Kafka Produce Setup + // rd_kafka_conf_t *conf; + // char errstr[512]; - // Create client configuration - conf = rd_kafka_conf_new(); - set_config(conf, "bootstrap.servers", "linkafka01:9092"); - set_config(conf, "queue.buffering.max.messages", "1e7"); + // // Create client configuration + // conf = rd_kafka_conf_new(); + // set_config(conf, "bootstrap.servers", "linkafka01:9092"); + // set_config(conf, "queue.buffering.max.messages", "1e7"); - // Create the Producer instance. - rd_kafka_t *producer = - rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); - if (!producer) { - // TODO - // g_error("Failed to create new producer: %s", errstr); - exit(1); - } + // // Create the Producer instance. + // rd_kafka_t *producer = + // rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + // if (!producer) { + // // TODO + // // g_error("Failed to create new producer: %s", errstr); + // exit(1); + // } - char *msg = "asdf\n"; - // EventMessageBuilder b; - // We could I believe reuse a buffer which might be more performant. - flatbuffers::FlatBufferBuilder builder(1024); - std::vector tof = {1, 2, 3}; - std::vector did = {0, 0, 0}; - auto message = - CreateEventMessageDirect(builder, "monitor1", 0, 0, &tof, &did); + // char *msg = "asdf\n"; + // // EventMessageBuilder b; + // // We could I believe reuse a buffer which might be more performant. + // flatbuffers::FlatBufferBuilder builder(1024); + // // clear with build.Clear(); + // std::vector tof = {1, 2, 3}; + // std::vector did = {0, 0, 0}; + // auto message = + // CreateEventMessageDirect(builder, "monitor1", 0, 0, &tof, &did); - builder.Finish(message, "ev42"); - printf("buffer size: %d\n", builder.GetSize()); + // builder.Finish(message, "ev42"); + // printf("buffer size: %d\n", builder.GetSize()); - rd_kafka_resp_err_t err = rd_kafka_producev( - producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - // RD_KAFKA_V_KEY((void *)key, key_len), - RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()), - // RD_KAFKA_V_OPAQUE(NULL), - RD_KAFKA_V_END); + // rd_kafka_resp_err_t err = rd_kafka_producev( + // producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), + // RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + // // RD_KAFKA_V_KEY((void *)key, key_len), + // RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), + // builder.GetSize()), + // // RD_KAFKA_V_OPAQUE(NULL), + // RD_KAFKA_V_END); - if (err) { - // TODO - // g_error("Failed to produce to topic %s: %s", topic, - // rd_kafka_err2str(err)); - exit(1); - } + // if (err) { + // // TODO + // // g_error("Failed to produce to topic %s: %s", topic, + // // rd_kafka_err2str(err)); + // exit(1); + // } - epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); + // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); - rd_kafka_poll(producer, 0); - epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); - rd_kafka_flush(producer, 10 * 1000); - epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); + // rd_kafka_poll(producer, 0); + // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); + // rd_kafka_flush(producer, 10 * 1000); + // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { delete[] P_Counts; } +// TODO pretty sure I don't actually need to overwrite this asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) { // asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser, @@ -207,6 +242,56 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, return asynSuccess; } +void asynStreamGeneratorDriver::produceMonitor() { + + flatbuffers::FlatBufferBuilder builder(1024); + + while (true) { + + if (!this->monitorQueue.isEmpty()) { + + builder.Clear(); + auto nme = this->monitorQueue.pop(); + + std::vector tof = {nme->TimeStamp}; + std::vector did = {nme->DataID}; + + auto message = + CreateEventMessageDirect(builder, "monitor", 0, 0, &tof, &did); + + builder.Finish(message, "ev42"); + // printf("buffer size: %d\n", builder.GetSize()); + + rd_kafka_resp_err_t err = rd_kafka_producev( + monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + // RD_KAFKA_V_KEY((void *)key, key_len), + RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), + builder.GetSize()), + // RD_KAFKA_V_OPAQUE(NULL), + RD_KAFKA_V_END); + + if (err) { + // TODO + // g_error("Failed to produce to topic %s: %s", topic, + // rd_kafka_err2str(err)); + } + + // epicsStdoutPrintf("Kafka Queue Size %d\n", + // rd_kafka_outq_len(monitorProducer)); + + rd_kafka_poll(monitorProducer, 0); + + printf("Monitor Events Queued %d\n", this->monitorQueue.getHighWaterMark()); + this->monitorQueue.resetHighWaterMark(); + + delete nme; + } + + epicsThreadSleep(0.001); // seconds + } +} + void asynStreamGeneratorDriver::receiveUDP() { asynStatus status; int isConnected; @@ -284,6 +369,14 @@ void asynStreamGeneratorDriver::receiveUDP() { // (uint64_t)m_event->nanosecs()); monitor_counts[m_event->DataID] += 1; + + // TODO needs to be freed + auto nme = new NormalisedMonitorEvent(); + nme->TimeStamp = + header->nanosecs() + (uint64_t)m_event->nanosecs(); + nme->DataID = m_event->DataID; + this->monitorQueue.push(nme); + } else { // Detector Event DetectorEvent *d_event = (DetectorEvent *)event; } diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 780cde7..c1f00cd 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -2,6 +2,13 @@ #define asynStreamGeneratorDriver_H #include "asynPortDriver.h" +#include +#include + +struct __attribute__((__packed__)) NormalisedMonitorEvent { + uint64_t TimeStamp; + uint32_t DataID : 4; +}; /* These are the drvInfo strings that are used to identify the parameters. */ #define P_CountsString "COUNTS%d" /* asynInt32, r/w */ @@ -16,6 +23,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); void receiveUDP(); + void produceMonitor(); protected: int *P_Counts; @@ -23,6 +31,9 @@ class asynStreamGeneratorDriver : public asynPortDriver { private: asynUser *pasynUDPUser; int num_channels; + + epicsRingPointer monitorQueue; + rd_kafka_t *monitorProducer; }; #endif