#include "asynOctetSyncIO.h" #include "ev42_events_generated.h" #include #include #include #include #include // Just for printing #define __STDC_FORMAT_MACROS #include #include "asynStreamGeneratorDriver.h" #include /******************************************************************************* * Kafka Methods */ static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key, char *value) { char errstr[512]; rd_kafka_conf_res_t res; res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value); exit(1); } } static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) { char errstr[512]; rd_kafka_t *producer; // Prepare configuration object rd_kafka_conf_t *conf = rd_kafka_conf_new(); // TODO feel not great about this set_kafka_config_key(conf, "bootstrap.servers", const_cast(kafkaBroker)); set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000"); // With 2e6 counts / s // and a packet size of 20480 events (163920 bytes) // this implies we need to send around 100 messages a second // and need about .2 gigabit upload // set_kafka_config_key(conf, "queue.buffering.max.kbytes", "10000000"); // Create the Producer producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!producer) { epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr); exit(1); } return producer; } /******************************************************************************* * Static Methods Passed to Epics Threads that should run in the background */ static void udpPollerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->receiveUDP(); } static void udpNormaliserTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->normaliseUDP(); } static void sortTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->partialSortEvents(); } static void daqTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->processEvents(); } static void monitorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceMonitor(); } static void detectorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceDetector(); } /******************************************************************************* * Stream Generator Helper Methods */ asynStatus asynStreamGeneratorDriver::createInt32Param( asynStatus status, char *name, int *variable, epicsInt32 initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamInt32, variable) | setIntegerParam(*variable, initialValue)); } asynStatus asynStreamGeneratorDriver::createInt64Param( asynStatus status, char *name, int *variable, epicsInt64 initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamInt64, variable) | setInteger64Param(*variable, initialValue)); } asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status, char *name, int *variable, double initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamFloat64, variable) | setDoubleParam(*variable, initialValue)); } /******************************************************************************* * Stream Generator Methods */ asynStreamGeneratorDriver::asynStreamGeneratorDriver( const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const bool enableKafkaStream, const char *kafkaBroker, const char *monitorTopic, const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynFloat64Mask | asynDrvUserMask, /* Interface mask */ asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */ 0, /* asynFlags. This driver does not block and it is not multi-device, but has a destructor ASYN_DESTRUCTIBLE our version of the Asyn is too old to support this flag */ 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), monitorTopic(monitorTopic), detectorTopic(detectorTopic), udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize), // measured in max packet sizes udpQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), normalisedQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), // TODO configurable sizes sortedQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), monitorQueue( epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), detectorQueue( epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), kafkaMaxPacketSize(kafkaMaxPacketSize) { const char *functionName = "asynStreamGeneratorDriver"; // Parameter Setup asynStatus status = asynSuccess; status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE); status = createInt32Param(status, P_ResetString, &P_Reset); status = createInt32Param(status, P_StopString, &P_Stop); status = createInt32Param(status, P_CountPresetString, &P_CountPreset); status = createInt32Param(status, P_TimePresetString, &P_TimePreset); status = createFloat64Param(status, P_ElapsedTimeString, &P_ElapsedTime); status = createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime); status = createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel); status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1); status = createInt32Param(status, P_ThresholdChannelString, &P_ThresholdChannel, 1); // Create Parameters templated on Channel Number char pv_name_buffer[100]; P_Counts = new int[this->num_channels]; P_Rates = new int[this->num_channels]; P_ClearCounts = new int[this->num_channels]; for (std::size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); status = createInt64Param(status, pv_name_buffer, P_Counts + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_RateString, i); status = createInt32Param(status, pv_name_buffer, P_Rates + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i); status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i); } status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped); status = createInt32Param(status, P_UdpQueueHighWaterMarkString, &P_UdpQueueHighWaterMark); status = createInt32Param(status, P_SortedQueueHighWaterMarkString, &P_SortedQueueHighWaterMark); if (status) { epicsStdoutPrintf( "%s:%s: failed to create or setup parameters, status=%d\n", driverName, functionName, status); exit(1); } // Create Events // this->pausedEventId = epicsEventCreate(epicsEventEmpty); if (enableKafkaStream) { epicsStdoutPrintf( "Detector Kafka Config: broker=%s, topic=%s\n " " queue size:%d, max events per packet: %d\n", kafkaBroker, this->detectorTopic, kafkaQueueSize, this->kafkaMaxPacketSize); epicsStdoutPrintf( "Monitors Kafka Config: broker=%s, topic=%s\n " " queue size:%d, max events per packet: %d\n", kafkaBroker, this->monitorTopic, kafkaQueueSize, this->kafkaMaxPacketSize); this->monitorProducer = create_kafka_producer(kafkaBroker); this->detectorProducer = create_kafka_producer(kafkaBroker); // Setup for Thread Producing Monitor Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // Setup for Thread Producing Detector Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::detectorProducerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } } else { epicsStdoutPrintf("Kafka Stream Disabled\n"); } /* Create the thread that orders the events and acts as our sinqDaq stand-in */ status = (asynStatus)(epicsThreadCreate( "sinqDAQ", epicsThreadPriorityMedium, // epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::daqTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } /* Create the thread that orders packets of in preparation for our sinqDAQ * stand-in */ status = (asynStatus)(epicsThreadCreate( "partialSort", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::sortTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } /* Create the thread normalises the events */ status = (asynStatus)(epicsThreadCreate( "eventNormaliser", epicsThreadPriorityMedium, // epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // UDP Receive Setup status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); if (status) { epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n", driverName, functionName, ipPortName, status); exit(1); } /* Create the thread that receives UDP traffic in the background */ status = (asynStatus)(epicsThreadCreate( "udp_receive", epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } } asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { // should make sure queues are empty and freed // and that the kafka producers are flushed and freed delete[] P_Counts; delete[] P_Rates; // TODO add exit should perhaps ensure the queue is flushed // rd_kafka_poll(producer, 0); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); // rd_kafka_flush(producer, 10 * 1000); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) { int function = pasynUser->reason; asynStatus status = asynSuccess; const char *paramName; const char *functionName = "readInt32"; getParamName(function, ¶mName); if (function == P_UdpQueueHighWaterMark) { const double toPercent = 100. / (243. * udpQueueSize); *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) / sizeof(NormalisedEvent) * toPercent); // Aparently resetting the watermark causes problems... // at least concurrently :D // epicsRingBytesResetHighWaterMark(this->udpQueue); return asynSuccess; } else if (function == P_SortedQueueHighWaterMark) { const double toPercent = 100. / (243. * udpQueueSize); *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) / sizeof(NormalisedEvent) * toPercent); // epicsRingBytesResetHighWaterMark(this->sortedQueue); return asynSuccess; } return asynPortDriver::readInt32(pasynUser, value); } asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, epicsInt32 value) { int function = pasynUser->reason; asynStatus status = asynSuccess; const char *paramName; const char *functionName = "writeInt32"; getParamName(function, ¶mName); // TODO should maybe lock mutex for this epicsInt32 currentStatus; status = getIntegerParam(this->P_Status, ¤tStatus); if (status) { epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, "%s:%s: status=%d, function=%d, name=%s, value=%d", driverName, functionName, status, function, paramName, value); return status; } // TODO clean up bool isClearCount = false; size_t channelToClear; for (size_t i = 0; i < this->num_channels; ++i) { isClearCount |= function == P_ClearCounts[i]; if (isClearCount) { channelToClear = i; break; } } // TODO should check everything... if (function == P_CountPreset) { if (!currentStatus) { setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); status = (asynStatus)callParamCallbacks(); } else { return asynError; } } else if (function == P_TimePreset) { if (!currentStatus) { setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); status = (asynStatus)callParamCallbacks(); } else { return asynError; } } else if (function == P_ClearElapsedTime) { if (!currentStatus) { setIntegerParam(P_ElapsedTime, 0); status = (asynStatus)callParamCallbacks(); } else { return asynError; } } else if (isClearCount) { if (!currentStatus) { setInteger64Param(P_Counts[channelToClear], 0); status = (asynStatus)callParamCallbacks(); } else { return asynError; } } else if (function == P_Reset) { lock(); // TODO should probably set back everything to defaults setIntegerParam(P_Status, STATUS_IDLE); status = (asynStatus)callParamCallbacks(); unlock(); } else if (function == P_Stop) { lock(); setIntegerParam(P_Status, STATUS_IDLE); status = (asynStatus)callParamCallbacks(); unlock(); } else if (function == P_MonitorChannel) { if (!currentStatus) { setIntegerParam(function, value); status = (asynStatus)callParamCallbacks(); } else { return asynError; } } else { setIntegerParam(function, value); status = (asynStatus)callParamCallbacks(); } if (status) epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, "%s:%s: status=%d, function=%d, name=%s, value=%d", driverName, functionName, status, function, paramName, value); return status; } void asynStreamGeneratorDriver::receiveUDP() { const char *functionName = "receiveUDP"; asynStatus status = asynSuccess; // int isConnected = 1; std::size_t received; int eomReason; const std::size_t bufferSize = 1500; char buffer[bufferSize]; while (true) { // status = pasynManager->isConnected(pasynUDPUser, &isConnected); // if (!isConnected) // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: isConnected = %d\n", driverName, functionName, // isConnected); status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, 0, // timeout &received, &eomReason); if (received) { const uint16_t bufferLength = ((uint16_t *)buffer)[0]; const std::size_t headerLength = 42; if (received >= headerLength && received == bufferLength * 2) { epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize); } else { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: invalid UDP packet\n", driverName, functionName); } } } } void asynStreamGeneratorDriver::normaliseUDP() { // TODO fix time overflows // Regarding time overflow. // * the header time stamp is 3 words, i.e. 48 bits. // * it has a resolution of 100ns // * so we can cover a maximum of (2^(3*16) - 1) * 1e-7 = 28147497 seconds // * or about 325 days // * so maybe this isn't necessary to solve, as long as we restart the // electronics at least once a year... const char *functionName = "normaliseUDP"; asynStatus status = asynSuccess; int isConnected = 1; std::size_t received; int eomReason; // The correlation unit sends messages with a maximum size of 1500 bytes. // These messages don't have any obious start or end to synchronise // against... const std::size_t bufferSize = 1500; char buffer[bufferSize]; const std::size_t resultBufferSize = 243; NormalisedEvent resultBuffer[resultBufferSize]; // We have 10 mcpdids uint64_t lastBufferNumber[10]; for (size_t i = 0; i < 10; ++i) { lastBufferNumber[i] = 0; } epicsInt32 droppedMessages = 0; const UDPHeader *header; const DetectorEvent *d_event; const MonitorEvent *m_event; NormalisedEvent ne; while (true) { if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) { epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize); header = (UDPHeader *)buffer; const std::size_t total_events = (header->BufferLength - 21) / 3; if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 && lastBufferNumber[header->McpdID] != std::numeric_limits< decltype(header->BufferNumber)>::max()) { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: missed packet on id: %d. Received: %" PRIu64 ", last: %" PRIu64 "\n", driverName, functionName, header->McpdID, header->BufferNumber, lastBufferNumber[header->McpdID]); setIntegerParam(P_UdpDropped, ++droppedMessages); } lastBufferNumber[header->McpdID] = header->BufferNumber; for (std::size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); const bool isMonitorEvent = event[5] & 0x80; if (isMonitorEvent) { m_event = (MonitorEvent *)event; ne.timestamp = header->nanosecs() + (uint64_t)m_event->nanosecs(); ne.source = 0; ne.pixelId = m_event->DataID; } else { d_event = (DetectorEvent *)event; ne.timestamp = header->nanosecs() + (uint64_t)d_event->nanosecs(); ne.source = header->McpdID; ne.pixelId = d_event->pixelId(header->McpdID); } resultBuffer[i] = ne; } epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer, total_events * sizeof(NormalisedEvent)); } else { epicsThreadSleep(0.0001); // seconds } } } struct { bool operator()(const NormalisedEvent l, const NormalisedEvent r) const { return l.timestamp < r.timestamp; } } oldestEventsFirst; inline int eventsInQueue(epicsRingBytesId id) { return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent); } void asynStreamGeneratorDriver::partialSortEvents() { const char *functionName = "partialSortEvents"; // x * number of ids * max events in packet int bufferedEvents = 5 * 10 * 243; NormalisedEvent events[bufferedEvents]; int queuedEvents = 0; epicsTimeStamp lastSort = epicsTime::getCurrent(); epicsTimeStamp currentTime = lastSort; while (true) { queuedEvents = eventsInQueue(this->normalisedQueue); // in case we can't wait lastSort = epicsTime::getCurrent(); currentTime = lastSort; // wait for mininmum packet frequency or enough packets to ensure we // could potentially have at least 1 packet per mcpdid while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); queuedEvents = eventsInQueue(this->normalisedQueue); } queuedEvents = std::min(queuedEvents, bufferedEvents); if (queuedEvents) { epicsRingBytesGet(this->normalisedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); std::sort(events, events + queuedEvents, oldestEventsFirst); epicsRingBytesPut(this->sortedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); } } } inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { if (this->kafkaEnabled) { if (ne.source == 0) epicsRingBytesPut(this->monitorQueue, (char *)&ne, sizeof(NormalisedEvent)); else epicsRingBytesPut(this->detectorQueue, (char *)&ne, sizeof(NormalisedEvent)); } } void asynStreamGeneratorDriver::processEvents() { const char *functionName = "processEvents"; // x * number of ids * max events in packet * event size int bufferedEvents = 5 * 10 * 243; // we need a little extra space for merge sorting in int extraBufferedEvents = 1 * 10 * 243; // we have two buffers. We alternate between reading data into one of them, // and then merge sorting into the other NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)]; NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)]; NormalisedEvent *eventsA = &eventsABuffer[0]; NormalisedEvent *eventsB = &eventsBBuffer[0]; NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents; NormalisedEvent *eventsBLastEnd = eventsBLastStart; int queuedEvents = 0; epicsTimeStamp lastProcess = epicsTime::getCurrent(); epicsTimeStamp currentTime = lastProcess; epicsInt64 counts[this->num_channels]; double elapsedSeconds = 0; uint64_t startTimestamp = std::numeric_limits::max(); uint64_t currTimestamp; epicsInt32 currStatus = STATUS_IDLE; epicsInt32 prevStatus = STATUS_IDLE; epicsInt32 countPreset; epicsInt32 timePreset; epicsInt32 presetChannel; epicsInt32 udpQueueHighWaterMark = 0; epicsInt32 sortedQueueHighWaterMark = 0; while (true) { queuedEvents = eventsInQueue(this->sortedQueue); // in case we can't wait lastProcess = epicsTime::getCurrent(); currentTime = lastProcess; // wait for mininmum packet frequency or enough packets to ensure we // could potentially have at least 1 packet per mcpdid while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); queuedEvents = eventsInQueue(this->sortedQueue); } getIntegerParam(this->P_Status, &currStatus); queuedEvents = std::min(queuedEvents, bufferedEvents); NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents; // We read into the array, such that we have enough space, that the // entirety of the leftover from the previous read can fit before this // new read, in the case that all new events are newer timewise, and // therefore, all events from eventsB have to be placed in a preceeding // position. epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent)); int toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; // TODO could also consider an in-place merge eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents, eventsBLastStart, eventsBLastEnd, eventsA, oldestEventsFirst); eventsBLastStart = eventsA + toProcess; // TODO I haven't really taken care of the case that there are no events if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) { getIntegerParam(this->P_CountPreset, &countPreset); getIntegerParam(this->P_TimePreset, &timePreset); getIntegerParam(this->P_MonitorChannel, &presetChannel); // reset status variables startTimestamp = eventsA[0].timestamp; elapsedSeconds = 0; for (size_t i = 0; i < this->num_channels; ++i) { counts[i] = 0; } } if (currStatus == STATUS_COUNTING) { // The elapsedSeconds are round differently depending on whether we // are using them for comparison, or for showing to the user, to // try and make sure the data we send to kafka is correct, while // the measurement time also appears intuitive. for (size_t i = 0; i < toProcess; ++i) { counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += 1; elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; // TODO should really check there an no more events with the // same final timestamp if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedSeconds > (double)timePreset)) break; // TODO also batchwise? this->queueForKafka(eventsA[i]); } for (size_t i = 0; i < num_channels; ++i) { setInteger64Param(P_Counts[i], counts[i]); } setDoubleParam(P_ElapsedTime, elapsedSeconds); if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedSeconds > (double)timePreset)) { setIntegerParam(this->P_Status, STATUS_IDLE); setIntegerParam(this->P_CountPreset, 0); setIntegerParam(this->P_TimePreset, 0); } } prevStatus = currStatus; std::swap(eventsA, eventsB); } } void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer, const char *topic, const char *source) { flatbuffers::FlatBufferBuilder builder(1024); const std::size_t bufferSize = this->kafkaMaxPacketSize + 16; std::vector tof; tof.reserve(bufferSize); std::vector did; did.reserve(bufferSize); epicsTimeStamp last_sent = epicsTime::getCurrent(); epicsTimeStamp now = last_sent; int total = 0; uint64_t message_id = 0; NormalisedEvent ne; while (true) { if (!epicsRingBytesIsEmpty(eventQueue)) { ++total; epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent)); tof.push_back(ne.timestamp); did.push_back(ne.pixelId); } else { epicsThreadSleep(0.001); // seconds } now = epicsTime::getCurrent(); // At least every 0.2 seconds if (total >= this->kafkaMaxPacketSize || epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) { last_sent = epicsTime::getCurrent(); if (total) { total = 0; builder.Clear(); auto message = CreateEventMessageDirect( builder, source, message_id++, ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + ((uint64_t)now.nsec), &tof, &did); builder.Finish(message, "ev42"); rd_kafka_resp_err_t err = rd_kafka_producev( kafkaProducer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()), // RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); if (err) { epicsStdoutPrintf("Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err)); } rd_kafka_poll(kafkaProducer, 0); tof.clear(); did.clear(); } } } } void asynStreamGeneratorDriver::produceMonitor() { this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor"); } void asynStreamGeneratorDriver::produceDetector() { this->produce(detectorQueue, detectorProducer, detectorTopic, "detector"); } /******************************************************************************* * Methods exposed to IOC Shell */ extern "C" { asynStatus asynStreamGeneratorDriverConfigure( const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic, const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize) { new asynStreamGeneratorDriver(portName, ipPortName, numChannels, udpQueueSize, kafkaBroker[0], kafkaBroker, monitorTopic, detectorTopic, kafkaQueueSize, kafkaMaxPacketSize); return asynSuccess; } static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt}; static const iocshArg initArg4 = {"kafkaBroker", iocshArgString}; static const iocshArg initArg5 = {"monitorTopic", iocshArgString}; static const iocshArg initArg6 = {"detectorTopic", iocshArgString}; static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt}; static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt}; static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, &initArg3, &initArg4, &initArg5, &initArg6, &initArg7, &initArg8}; static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs}; static void initCallFunc(const iocshArgBuf *args) { asynStreamGeneratorDriverConfigure( args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval, args[5].sval, args[6].sval, args[7].ival, args[8].ival); } void asynStreamGeneratorDriverRegister(void) { iocshRegister(&initFuncDef, initCallFunc); } epicsExportRegistrar(asynStreamGeneratorDriverRegister); }