#include "asynOctetSyncIO.h" #include "ev42_events_generated.h" #include #include #include #include // Just for printing #define __STDC_FORMAT_MACROS #include #include "asynStreamGeneratorDriver.h" #include /******************************************************************************* * Kafka Methods */ static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key, char *value) { char errstr[512]; rd_kafka_conf_res_t res; res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value); exit(1); } } static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) { char errstr[512]; rd_kafka_t *producer; // Prepare configuration object rd_kafka_conf_t *conf = rd_kafka_conf_new(); // TODO feel not great about this set_kafka_config_key(conf, "bootstrap.servers", const_cast(kafkaBroker)); set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000"); // Create the Producer producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!producer) { epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr); exit(1); } return producer; } /******************************************************************************* * Static Methods Passed to Epics Threads that should run in the background */ static void udpPollerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->receiveUDP(); } static void daqTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->processEvents(); } static void monitorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceMonitor(); } static void detectorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceDetector(); } /******************************************************************************* * Stream Generator Helper Methods */ asynStatus asynStreamGeneratorDriver::createInt32Param( // TODO should show error if there is one asynStatus status, char *name, int *variable, epicsInt32 initialValue) { return (asynStatus)(status | createParam(name, asynParamInt32, variable) | setIntegerParam(*variable, initialValue)); } /******************************************************************************* * Stream Generator Methods */ asynStreamGeneratorDriver::asynStreamGeneratorDriver( const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const bool enableKafkaStream, const char *kafkaBroker, const char *monitorTopic, const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ asynInt32Mask | asynInt64Mask, /* Interrupt mask */ 0, /* asynFlags. This driver does not block and it is not multi-device, but has a destructor ASYN_DESTRUCTIBLE our version of the Asyn is too old to support this flag */ 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), monitorTopic(monitorTopic), detectorTopic(detectorTopic), udpQueue(udpQueueSize, false), monitorQueue(kafkaQueueSize, false), detectorQueue(kafkaQueueSize, false), kafkaMaxPacketSize(kafkaMaxPacketSize) { const char *functionName = "asynStreamGeneratorDriver"; // Parameter Setup asynStatus status = asynSuccess; status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE); status = createInt32Param(status, P_ResetString, &P_Reset); status = createInt32Param(status, P_StopString, &P_Stop); status = createInt32Param(status, P_CountPresetString, &P_CountPreset); status = createInt32Param(status, P_TimePresetString, &P_TimePreset); status = createInt32Param(status, P_ElapsedTimeString, &P_ElapsedTime); status = createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime); status = createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel); status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1); status = createInt32Param(status, P_ThresholdChannelString, &P_ThresholdChannel, 1); // Create Parameters templated on Channel Number char pv_name_buffer[100]; P_Counts = new int[this->num_channels]; P_Rates = new int[this->num_channels]; P_ClearCounts = new int[this->num_channels]; for (std::size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); status = createInt32Param(status, pv_name_buffer, P_Counts + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_RateString, i); status = createInt32Param(status, pv_name_buffer, P_Rates + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i); status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i); } if (status) { epicsStdoutPrintf( "%s:%s: failed to create or setup parameters, status=%d\n", driverName, functionName, status); exit(1); } // Create Events this->pausedEventId = epicsEventCreate(epicsEventEmpty); if (enableKafkaStream) { epicsStdoutPrintf( "Detector Kafka Config: broker=%s, topic=%s\n " " queue size:%d, max events per packet: %d\n", kafkaBroker, this->detectorTopic, kafkaQueueSize, this->kafkaMaxPacketSize); epicsStdoutPrintf( "Monitors Kafka Config: broker=%s, topic=%s\n " " queue size:%d, max events per packet: %d\n", kafkaBroker, this->monitorTopic, kafkaQueueSize, this->kafkaMaxPacketSize); this->monitorProducer = create_kafka_producer(kafkaBroker); this->detectorProducer = create_kafka_producer(kafkaBroker); // Setup for Thread Producing Monitor Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // Setup for Thread Producing Detector Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::detectorProducerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } } else { epicsStdoutPrintf("Kafka Stream Disabled\n"); } /* Create the thread that orders the events and acts as our sinqDaq stand-in */ status = (asynStatus)(epicsThreadCreate( "sinqDAQ", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::daqTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // UDP Receive Setup status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); if (status) { epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n", driverName, functionName, ipPortName, status); exit(1); } /* Create the thread that receives UDP traffic in the background */ status = (asynStatus)(epicsThreadCreate( "udp_receive", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } } asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { // should make sure queues are empty and freed // and that the kafka producers are flushed and freed delete[] P_Counts; delete[] P_Rates; // TODO add exit should perhaps ensure the queue is flushed // rd_kafka_poll(producer, 0); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); // rd_kafka_flush(producer, 10 * 1000); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, epicsInt32 value) { int function = pasynUser->reason; asynStatus status = asynSuccess; const char *paramName; const char *functionName = "writeInt32"; getParamName(function, ¶mName); // if (status) { // epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, // "%s:%s: status=%d, function=%d, name=%s, value=%d", // driverName, functionName, status, function, paramName, // value); // return status; // } if (function == P_CountPreset) { // TODO should block setting a preset when already set setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); status = (asynStatus)callParamCallbacks(); epicsEventSignal(this->pausedEventId); } else if (function == P_TimePreset) { // TODO should block setting a preset when already set setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); status = (asynStatus)callParamCallbacks(); epicsEventSignal(this->pausedEventId); } else if (function == P_Reset) { // TODO should probably set back everything to defaults setIntegerParam(P_Status, STATUS_IDLE); status = (asynStatus)callParamCallbacks(); } else if (function == P_MonitorChannel) { epicsInt32 currentStatus; getIntegerParam(this->P_Status, ¤tStatus); if (!currentStatus) { setIntegerParam(function, value); status = (asynStatus)callParamCallbacks(); } } else { setIntegerParam(function, value); status = (asynStatus)callParamCallbacks(); } if (status) epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, "%s:%s: status=%d, function=%d, name=%s, value=%d", driverName, functionName, status, function, paramName, value); return status; } void asynStreamGeneratorDriver::receiveUDP() { // TODO fix time overflows // TODO check for lost packets const char *functionName = "receiveUDP"; asynStatus status = asynSuccess; int isConnected = 1; std::size_t received; int eomReason; // The correlation unit sents messages with a maximum size of 1500 bytes. // These messages don't have any obious start or end to synchronise // against... const std::size_t bufferSize = 1500; char buffer[bufferSize + 1]; // so that \0 can fit while (true) { status = pasynManager->isConnected(pasynUDPUser, &isConnected); if (!isConnected) asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: isConnected = %d\n", driverName, functionName, isConnected); status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, 0, // timeout &received, &eomReason); if (received) { UDPHeader *header = (UDPHeader *)buffer; std::size_t total_events = (header->BufferLength - 21) / 3; if (received == total_events * 6 + 42) { for (std::size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); NormalisedEvent *ne; if (event[5] & 0x80) { // Monitor Event MonitorEvent *m_event = (MonitorEvent *)event; // needs to be freed!!! ne = new NormalisedEvent( header->nanosecs() + (uint64_t)m_event->nanosecs(), 0, m_event->DataID); } else { // Detector Event DetectorEvent *d_event = (DetectorEvent *)event; // needs to be freed!!! ne = new NormalisedEvent( header->nanosecs() + (uint64_t)d_event->nanosecs(), header->McpdID, d_event->pixelId(header->McpdID)); } this->udpQueue.push(ne); } } else { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: invalid UDP packet\n", driverName, functionName); } } } } inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent *ne) { if (this->kafkaEnabled) { if (ne->source == 0) this->monitorQueue.push(ne); else this->detectorQueue.push(ne); } else { delete ne; } } void asynStreamGeneratorDriver::processEvents() { const char *functionName = "processEvents"; const size_t queueBufferSize = 10 * this->udpQueue.getSize(); struct { bool operator()(const NormalisedEvent *l, const NormalisedEvent *r) const { return l->timestamp > r->timestamp; } } smallestToLargest; // This should never be used. It is just instantiated to reserve a buffer // of specific size. std::vector queueBuffer; queueBuffer.reserve(queueBufferSize); std::priority_queue, decltype(smallestToLargest)> timeQueue(smallestToLargest, std::move(queueBuffer)); // TODO epics doesn't seem to support uint64, you would need an array of // uint32. It does support int64 though.. so we start with that epicsInt32 *counts = new epicsInt32[this->num_channels]; asynStatus status = asynSuccess; NormalisedEvent *ne; uint64_t newestTimestamp = 0; uint64_t startTimestamp = std::numeric_limits::max(); uint64_t currTimestamp; epicsInt32 elapsedSeconds = 0; epicsInt32 prevStatus = STATUS_IDLE; epicsInt32 currStatus = STATUS_IDLE; epicsInt32 countPreset = 0; epicsInt32 timePreset = 0; epicsInt32 presetChannel = 0; while (true) { if ((ne = this->udpQueue.pop()) != nullptr) { // we should reastart this ioc at least every few years, as at ns // resolution with a uint64_t we will have an overflow after around // 4 years newestTimestamp = std::max(newestTimestamp, ne->timestamp); timeQueue.push(ne); } // idea is to try and guarantee at least 1 packet per id or the min // frequency for each id without actually checking all ids if (timeQueue.size() >= 1500 * 10 || (timeQueue.size() > 0 && newestTimestamp - timeQueue.top()->timestamp >= 200'000'000ull)) { ne = timeQueue.top(); timeQueue.pop(); status = getIntegerParam(this->P_Status, &currStatus); if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) { // Starting a new count // get current count configuration getIntegerParam(this->P_CountPreset, &countPreset); getIntegerParam(this->P_TimePreset, &timePreset); getIntegerParam(this->P_MonitorChannel, &presetChannel); // reset status variables startTimestamp = std::numeric_limits::max(); for (size_t i = 0; i < this->num_channels; ++i) { counts[i] = 0; } // reset pvs lock(); for (size_t i = 0; i < num_channels; ++i) { setIntegerParam(P_Counts[i], counts[i]); } setIntegerParam(P_ElapsedTime, 0); callParamCallbacks(); unlock(); // TODO might consider throwing out current buffer as it is // from before count started? then again, 0.2 ms or whatever is // set above is quite a small preceeding amount of time, so // maybe it doesn't matter } prevStatus = currStatus; if (currStatus == STATUS_COUNTING) { startTimestamp = std::min(startTimestamp, ne->timestamp); counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; currTimestamp = ne->timestamp; elapsedSeconds = 0 ? currTimestamp <= startTimestamp : ((double)(currTimestamp - startTimestamp)) / 1e9; this->queueForKafka(ne); } else { delete ne; } // is our count finished? if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedSeconds >= timePreset)) { // add any remaining events with the same timestamp // we could theoretically have a small overrun if the // timestamps are identical on the monitor channel while (!timeQueue.empty() && !timeQueue.top()->timestamp == currTimestamp) { ne = timeQueue.top(); timeQueue.pop(); counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; this->queueForKafka(ne); } countPreset = 0; timePreset = 0; lock(); for (size_t i = 0; i < num_channels; ++i) { setIntegerParam(P_Counts[i], counts[i]); } setIntegerParam(P_ElapsedTime, elapsedSeconds); setIntegerParam(P_CountPreset, countPreset); setIntegerParam(P_TimePreset, timePreset); callParamCallbacks(); setIntegerParam(P_Status, STATUS_IDLE); callParamCallbacks(); unlock(); } else if (currStatus == STATUS_COUNTING) { lock(); for (size_t i = 0; i < num_channels; ++i) { setIntegerParam(P_Counts[i], counts[i]); } setIntegerParam(P_ElapsedTime, elapsedSeconds); callParamCallbacks(); unlock(); } } } } void asynStreamGeneratorDriver::produceMonitor() { flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; tof.reserve(this->kafkaMaxPacketSize + 16); std::vector did; did.reserve(this->kafkaMaxPacketSize + 16); int total = 0; epicsTimeStamp last_sent = epicsTime::getCurrent(); uint64_t message_id = 0; while (true) { if (!this->monitorQueue.isEmpty()) { ++total; auto nme = this->monitorQueue.pop(); tof.push_back(nme->timestamp); did.push_back(nme->pixelId); delete nme; } else { epicsThreadSleep(0.001); // seconds } // TODO can probably just replace the current // instead of always getting new object epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds if (total >= this->kafkaMaxPacketSize || epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { last_sent = epicsTime::getCurrent(); if (total) { total = 0; builder.Clear(); auto message = CreateEventMessageDirect( builder, "monitor", message_id++, ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + ((uint64_t)now.nsec), &tof, &did); builder.Finish(message, "ev42"); rd_kafka_resp_err_t err = rd_kafka_producev( monitorProducer, RD_KAFKA_V_TOPIC(this->monitorTopic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()), // RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); if (err) { epicsStdoutPrintf("Failed to produce to topic %s: %s\n", this->monitorTopic, rd_kafka_err2str(err)); } rd_kafka_poll(monitorProducer, 0); tof.clear(); did.clear(); } } } } void asynStreamGeneratorDriver::produceDetector() { static const std::size_t bufferSize = this->kafkaMaxPacketSize + 16; flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; tof.reserve(bufferSize); std::vector did; did.reserve(bufferSize); int total = 0; epicsTimeStamp last_sent = epicsTime::getCurrent(); uint64_t message_id = 0; while (true) { if (!this->detectorQueue.isEmpty()) { ++total; auto nde = this->detectorQueue.pop(); tof.push_back(nde->timestamp); did.push_back(nde->pixelId); delete nde; } else { // TODO // rd_kafka_flush(detectorProducer, 10 * 1000); epicsThreadSleep(0.001); // seconds } epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds if (total >= this->kafkaMaxPacketSize || epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { last_sent = epicsTime::getCurrent(); if (total) { total = 0; builder.Clear(); auto message = CreateEventMessageDirect( builder, "detector", message_id++, ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + ((uint64_t)now.nsec), &tof, &did); builder.Finish(message, "ev42"); rd_kafka_resp_err_t err = rd_kafka_producev( detectorProducer, RD_KAFKA_V_TOPIC(this->detectorTopic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()), // RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); if (err) { epicsStdoutPrintf("Failed to produce to topic %s: %s\n", this->detectorTopic, rd_kafka_err2str(err)); } rd_kafka_poll(detectorProducer, 0); tof.clear(); did.clear(); } } } } /******************************************************************************* * Methods exposed to IOC Shell */ extern "C" { asynStatus asynStreamGeneratorDriverConfigure( const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic, const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize) { new asynStreamGeneratorDriver(portName, ipPortName, numChannels, udpQueueSize, kafkaBroker[0], kafkaBroker, monitorTopic, detectorTopic, kafkaQueueSize, kafkaMaxPacketSize); return asynSuccess; } static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt}; static const iocshArg initArg4 = {"kafkaBroker", iocshArgString}; static const iocshArg initArg5 = {"monitorTopic", iocshArgString}; static const iocshArg initArg6 = {"detectorTopic", iocshArgString}; static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt}; static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt}; static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, &initArg3, &initArg4, &initArg5, &initArg6, &initArg7, &initArg8}; static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs}; static void initCallFunc(const iocshArgBuf *args) { asynStreamGeneratorDriverConfigure( args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval, args[5].sval, args[6].sval, args[7].ival, args[8].ival); } void asynStreamGeneratorDriverRegister(void) { iocshRegister(&initFuncDef, initCallFunc); } epicsExportRegistrar(asynStreamGeneratorDriverRegister); }