diff --git a/db/daq_common.db b/db/daq_common.db index 96a1cd3..631fd8a 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -129,7 +129,7 @@ record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL") field(DESC, "PRESET-COUNT Monitors this channel") field(DTYP, "asynInt32") field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR") - field(DRVL, "1") # Smallest Monitor Channel + field(DRVL, "0") # Smallest Monitor Channel field(DRVH, "$(CHANNELS)") # Largest Monitor Channel } diff --git a/scripts/st.cmd b/scripts/st.cmd index b5e6cc0..5802ad2 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -9,7 +9,8 @@ epicsEnvSet("INSTR", "SQ:TEST:") epicsEnvSet("NAME", "SG") drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1) -asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, 0, 1000, 8192) +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192) +# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0) dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5") diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 508540a..dfe1033 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -28,15 +28,17 @@ static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key, } } -static rd_kafka_t *create_kafka_producer() { +static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) { char errstr[512]; rd_kafka_t *producer; // Prepare configuration object rd_kafka_conf_t *conf = rd_kafka_conf_new(); - set_kafka_config_key(conf, "bootstrap.servers", "linkafka01:9092"); - set_kafka_config_key(conf, "queue.buffering.max.messages", "1e7"); + // TODO feel not great about this + set_kafka_config_key(conf, "bootstrap.servers", + const_cast(kafkaBroker)); + set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000"); // Create the Producer producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); @@ -89,7 +91,9 @@ asynStatus asynStreamGeneratorDriver::createInt32Param( asynStreamGeneratorDriver::asynStreamGeneratorDriver( const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const bool enableKafkaStream, - const int kafkaQueueSize, const int kafkaMaxPacketSize) + const char *kafkaBroker, const char *monitorTopic, + const char *detectorTopic, const int kafkaQueueSize, + const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ @@ -101,8 +105,10 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ - num_channels(numChannels + 1), udpQueue(udpQueueSize, false), - monitorQueue(kafkaQueueSize, false), detectorQueue(kafkaQueueSize, false), + num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), + monitorTopic(monitorTopic), detectorTopic(detectorTopic), + udpQueue(udpQueueSize, false), monitorQueue(kafkaQueueSize, false), + detectorQueue(kafkaQueueSize, false), kafkaMaxPacketSize(kafkaMaxPacketSize) { const char *functionName = "asynStreamGeneratorDriver"; @@ -143,8 +149,9 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( } if (status) { - printf("%s:%s: failed to create or setup parameters, status=%d\n", - driverName, functionName, status); + epicsStdoutPrintf( + "%s:%s: failed to create or setup parameters, status=%d\n", + driverName, functionName, status); exit(1); } @@ -152,8 +159,21 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( this->pausedEventId = epicsEventCreate(epicsEventEmpty); if (enableKafkaStream) { - this->monitorProducer = create_kafka_producer(); - this->detectorProducer = create_kafka_producer(); + + epicsStdoutPrintf( + "Detector Kafka Config: broker=%s, topic=%s\n " + " queue size:%d, max events per packet: %d\n", + kafkaBroker, this->detectorTopic, kafkaQueueSize, + this->kafkaMaxPacketSize); + + epicsStdoutPrintf( + "Monitors Kafka Config: broker=%s, topic=%s\n " + " queue size:%d, max events per packet: %d\n", + kafkaBroker, this->monitorTopic, kafkaQueueSize, + this->kafkaMaxPacketSize); + + this->monitorProducer = create_kafka_producer(kafkaBroker); + this->detectorProducer = create_kafka_producer(kafkaBroker); // Setup for Thread Producing Monitor Kafka Events status = @@ -163,8 +183,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); if (status) { - printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - functionName, status); + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); exit(1); } @@ -176,10 +196,13 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( (EPICSTHREADFUNC)::detectorProducerTask, this) == NULL); if (status) { - printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - functionName, status); + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); exit(1); } + } else { + + epicsStdoutPrintf("Kafka Stream Disabled\n"); } /* Create the thread that orders the events and acts as our sinqDaq stand-in @@ -189,8 +212,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::daqTask, this) == NULL); if (status) { - printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - functionName, status); + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); exit(1); } @@ -198,8 +221,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); if (status) { - printf("%s:%s: Couldn't open connection %s, status=%d\n", driverName, - functionName, ipPortName, status); + epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n", + driverName, functionName, ipPortName, status); exit(1); } @@ -209,8 +232,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { - printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, - functionName, status); + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); exit(1); } } @@ -352,6 +375,18 @@ void asynStreamGeneratorDriver::receiveUDP() { } } +inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent *ne) { + + if (this->kafkaEnabled) { + if (ne->source == 0) + this->monitorQueue.push(ne); + else + this->detectorQueue.push(ne); + } else { + delete ne; + } +} + void asynStreamGeneratorDriver::processEvents() { const char *functionName = "processEvents"; @@ -448,9 +483,11 @@ void asynStreamGeneratorDriver::processEvents() { elapsedSeconds = 0 ? currTimestamp <= startTimestamp : ((double)(currTimestamp - startTimestamp)) / 1e9; - } - delete ne; + this->queueForKafka(ne); + } else { + delete ne; + } // is our count finished? if ((countPreset && counts[presetChannel] >= countPreset) || @@ -464,7 +501,7 @@ void asynStreamGeneratorDriver::processEvents() { ne = timeQueue.top(); timeQueue.pop(); counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; - delete ne; + this->queueForKafka(ne); } countPreset = 0; @@ -516,8 +553,8 @@ void asynStreamGeneratorDriver::produceMonitor() { ++total; auto nme = this->monitorQueue.pop(); - tof.push_back(nme->TimeStamp); - did.push_back(nme->DataID); + tof.push_back(nme->timestamp); + did.push_back(nme->pixelId); delete nme; } else { @@ -547,7 +584,7 @@ void asynStreamGeneratorDriver::produceMonitor() { builder.Finish(message, "ev42"); rd_kafka_resp_err_t err = rd_kafka_producev( - monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), + monitorProducer, RD_KAFKA_V_TOPIC(this->monitorTopic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), @@ -556,9 +593,9 @@ void asynStreamGeneratorDriver::produceMonitor() { RD_KAFKA_V_END); if (err) { - // TODO - // g_error("Failed to produce to topic %s: %s", topic, - // rd_kafka_err2str(err)); + epicsStdoutPrintf("Failed to produce to topic %s: %s\n", + this->monitorTopic, + rd_kafka_err2str(err)); } rd_kafka_poll(monitorProducer, 0); @@ -586,55 +623,22 @@ void asynStreamGeneratorDriver::produceDetector() { uint64_t message_id = 0; - struct { - bool operator()(const uint64_t l, const uint64_t r) const { - return l > r; - } - } smallestToLargest; - - // This should never be used. It is just instantiated to reserve a buffer - // of specific size. - std::vector queueBuffer; - queueBuffer.reserve(bufferSize); - - std::priority_queue, - decltype(smallestToLargest)> - timeQueue(smallestToLargest, std::move(queueBuffer)); - - uint64_t newest = 0; - while (true) { if (!this->detectorQueue.isEmpty()) { ++total; auto nde = this->detectorQueue.pop(); - tof.push_back(nde->TimeStamp); - did.push_back(nde->PixID); - - newest = std::max(newest, nde->TimeStamp); - timeQueue.push(nde->TimeStamp); - + tof.push_back(nde->timestamp); + did.push_back(nde->pixelId); delete nde; + } else { + // TODO + // rd_kafka_flush(detectorProducer, 10 * 1000); epicsThreadSleep(0.001); // seconds } - while (!timeQueue.empty() && - (timeQueue.size() >= this->kafkaMaxPacketSize || - (newest - timeQueue.top()) > 5'000'000'000ull)) - timeQueue.pop(); - epicsInt32 rate = 0; - if (timeQueue.size() > 1) { - rate = ((double)timeQueue.size() / - ((double)(newest - timeQueue.top()) * 1e-9)); - } - - lock(); - setIntegerParam(P_Rates[0], rate); - callParamCallbacks(); - unlock(); - epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds @@ -656,7 +660,7 @@ void asynStreamGeneratorDriver::produceDetector() { builder.Finish(message, "ev42"); rd_kafka_resp_err_t err = rd_kafka_producev( - detectorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST2"), + detectorProducer, RD_KAFKA_V_TOPIC(this->detectorTopic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), @@ -665,9 +669,9 @@ void asynStreamGeneratorDriver::produceDetector() { RD_KAFKA_V_END); if (err) { - // TODO - // g_error("Failed to produce to topic %s: %s", topic, - // rd_kafka_err2str(err)); + epicsStdoutPrintf("Failed to produce to topic %s: %s\n", + this->detectorTopic, + rd_kafka_err2str(err)); } rd_kafka_poll(detectorProducer, 0); @@ -686,11 +690,13 @@ extern "C" { asynStatus asynStreamGeneratorDriverConfigure( const char *portName, const char *ipPortName, const int numChannels, - const int udpQueueSize, const bool enableKafkaStream, - const int kafkaQueueSize, const int kafkaMaxPacketSize) { + const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic, + const char *detectorTopic, const int kafkaQueueSize, + const int kafkaMaxPacketSize) { new asynStreamGeneratorDriver(portName, ipPortName, numChannels, - udpQueueSize, enableKafkaStream, - kafkaQueueSize, kafkaMaxPacketSize); + udpQueueSize, kafkaBroker[0], kafkaBroker, + monitorTopic, detectorTopic, kafkaQueueSize, + kafkaMaxPacketSize); return asynSuccess; } @@ -698,17 +704,19 @@ static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt}; -static const iocshArg initArg4 = {"enableKafkaStream", iocshArgInt}; -static const iocshArg initArg5 = {"kafkaQueueSize", iocshArgInt}; -static const iocshArg initArg6 = {"kafkaMaxPacketSize", iocshArgInt}; +static const iocshArg initArg4 = {"kafkaBroker", iocshArgString}; +static const iocshArg initArg5 = {"monitorTopic", iocshArgString}; +static const iocshArg initArg6 = {"detectorTopic", iocshArgString}; +static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt}; +static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt}; static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, &initArg3, &initArg4, &initArg5, - &initArg6}; -static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 6, initArgs}; + &initArg6, &initArg7, &initArg8}; +static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs}; static void initCallFunc(const iocshArgBuf *args) { - asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival, - args[3].ival, args[4].ival, args[5].ival, - args[6].ival); + asynStreamGeneratorDriverConfigure( + args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval, + args[5].sval, args[6].sval, args[7].ival, args[8].ival); } void asynStreamGeneratorDriverRegister(void) { diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index cc965ff..72075e2 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -112,6 +112,8 @@ class asynStreamGeneratorDriver : public asynPortDriver { asynStreamGeneratorDriver(const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const bool enableKafkaStream, + const char *kafkaBroker, const char *monitorTopic, + const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize); virtual ~asynStreamGeneratorDriver(); @@ -144,20 +146,25 @@ class asynStreamGeneratorDriver : public asynPortDriver { epicsEventId pausedEventId; const int num_channels; + const bool kafkaEnabled; const int kafkaMaxPacketSize; epicsRingPointer udpQueue; - epicsRingPointer monitorQueue; + epicsRingPointer monitorQueue; rd_kafka_t *monitorProducer; + const char *monitorTopic; - epicsRingPointer detectorQueue; + epicsRingPointer detectorQueue; rd_kafka_t *detectorProducer; + const char *detectorTopic; constexpr static char *driverName = "StreamGenerator"; asynStatus createInt32Param(asynStatus status, char *name, int *variable, epicsInt32 initialValue = 0); + + inline void queueForKafka(NormalisedEvent *ne); }; #endif