#include "asynOctetSyncIO.h" #include "ev42_events_generated.h" #include #include #include #include #include #include "asynStreamGeneratorDriver.h" #include /******************************************************************************* * Kafka Methods */ static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key, char *value) { char errstr[512]; rd_kafka_conf_res_t res; res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value); exit(1); } } static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) { char errstr[512]; rd_kafka_t *producer; // Prepare configuration object rd_kafka_conf_t *conf = rd_kafka_conf_new(); // TODO feel not great about this set_kafka_config_key(conf, const_cast("bootstrap.servers"), const_cast(kafkaBroker)); set_kafka_config_key(conf, const_cast("queue.buffering.max.messages"), const_cast("10000000")); // With 2e6 counts / s // and a packet size of 20480 events (163920 bytes) // this implies we need to send around 100 messages a second // and need about .2 gigabit upload // set_kafka_config_key(conf, "queue.buffering.max.kbytes", "10000000"); // Create the Producer producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!producer) { epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr); exit(1); } return producer; } /******************************************************************************* * Static Methods Passed to Epics Threads that should run in the background */ static void udpPollerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->receiveUDP(); } static void udpNormaliserTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->normaliseUDP(); } static void sortTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->partialSortEvents(); } static void daqTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->processEvents(); } static void monitorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceMonitor(); } static void detectorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceDetector(); } /******************************************************************************* * Stream Generator Helper Methods */ asynStatus asynStreamGeneratorDriver::createInt32Param(asynStatus status, const char *name, int *variable, epicsInt32 initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamInt32, variable) | setIntegerParam(*variable, initialValue)); } asynStatus asynStreamGeneratorDriver::createInt64Param(asynStatus status, const char *name, int *variable, epicsInt64 initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamInt64, variable) | setInteger64Param(*variable, initialValue)); } asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status, const char *name, int *variable, double initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamFloat64, variable) | setDoubleParam(*variable, initialValue)); } /******************************************************************************* * Stream Generator Methods */ asynStreamGeneratorDriver::asynStreamGeneratorDriver( const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const bool enableKafkaStream, const char *kafkaBroker, const char *monitorTopic, const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynFloat64Mask | asynDrvUserMask, /* Interface mask */ asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */ 0, /* asynFlags. This driver does not block and it is not multi-device, but has a destructor ASYN_DESTRUCTIBLE our version of the Asyn is too old to support this flag */ 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), kafkaQueueSize(kafkaQueueSize), kafkaMaxPacketSize(kafkaMaxPacketSize), udpQueueSize(udpQueueSize), // measured in max packet sizes udpQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), normalisedQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), // TODO configurable sizes sortedQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), monitorQueue( epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), monitorTopic(monitorTopic), detectorQueue( epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), detectorTopic(detectorTopic) { const char functionName[]{"asynStreamGeneratorDriver"}; // Parameter Setup asynStatus status = asynSuccess; status = createInt32Param(status, P_EnableElectronicsString, &P_EnableElectronics, 1); status = createInt32Param(status, P_EnableElectronicsRBVString, &P_EnableElectronicsRBV); status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE); status = createInt32Param(status, P_ResetString, &P_Reset); status = createInt32Param(status, P_StopString, &P_Stop); status = createInt32Param(status, P_CountPresetString, &P_CountPreset); status = createInt32Param(status, P_TimePresetString, &P_TimePreset); status = createFloat64Param(status, P_ElapsedTimeString, &P_ElapsedTime); status = createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime); status = createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel, this->num_channels); status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1); status = createInt32Param(status, P_ThresholdChannelString, &P_ThresholdChannel, 1); // Create Parameters templated on Channel Number char pv_name_buffer[100]; P_Counts = new int[this->num_channels]; P_Rates = new int[this->num_channels]; P_ClearCounts = new int[this->num_channels]; for (std::size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i + 1); status = createInt64Param(status, pv_name_buffer, P_Counts + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_RateString, i + 1); status = createInt32Param(status, pv_name_buffer, P_Rates + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i + 1); status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i); } status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped); status = createInt32Param(status, P_UdpQueueHighWaterMarkString, &P_UdpQueueHighWaterMark); status = createInt32Param(status, P_SortedQueueHighWaterMarkString, &P_SortedQueueHighWaterMark); if (status) { epicsStdoutPrintf( "%s:%s: failed to create or setup parameters, status=%d\n", driverName, functionName, status); exit(1); } // Create Events // this->pausedEventId = epicsEventCreate(epicsEventEmpty); if (enableKafkaStream) { epicsStdoutPrintf( "Detector Kafka Config: broker=%s, topic=%s\n " " queue size:%d, max events per packet: %d\n", kafkaBroker, this->detectorTopic.c_str(), kafkaQueueSize, this->kafkaMaxPacketSize); epicsStdoutPrintf( "Monitors Kafka Config: broker=%s, topic=%s\n " " queue size:%d, max events per packet: %d\n", kafkaBroker, this->monitorTopic.c_str(), kafkaQueueSize, this->kafkaMaxPacketSize); this->monitorProducer = create_kafka_producer(kafkaBroker); this->detectorProducer = create_kafka_producer(kafkaBroker); // Setup for Thread Producing Monitor Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // Setup for Thread Producing Detector Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::detectorProducerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } } else { epicsStdoutPrintf("Kafka Stream Disabled\n"); } /* Create the thread that orders the events and acts as our sinqDaq stand-in */ status = (asynStatus)(epicsThreadCreate( "sinqDAQ", epicsThreadPriorityMedium, // epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::daqTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } /* Create the thread that orders packets of in preparation for our sinqDAQ * stand-in */ status = (asynStatus)(epicsThreadCreate( "partialSort", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::sortTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } /* Create the thread normalises the events */ status = (asynStatus)(epicsThreadCreate( "eventNormaliser", epicsThreadPriorityMedium, // epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // UDP Receive Setup status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); if (status) { epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n", driverName, functionName, ipPortName, status); exit(1); } /* Create the thread that receives UDP traffic in the background */ status = (asynStatus)(epicsThreadCreate( "udp_receive", epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } } asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { // should make sure queues are empty and freed // and that the kafka producers are flushed and freed delete[] P_Counts; delete[] P_Rates; // TODO add exit should perhaps ensure the queue is flushed // rd_kafka_poll(producer, 0); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); // rd_kafka_flush(producer, 10 * 1000); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) { int function = pasynUser->reason; const char *paramName; // const char functionName[]{"readInt32"}; getParamName(function, ¶mName); if (function == P_UdpQueueHighWaterMark) { const double toPercent = 100. / (243. * udpQueueSize); *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) / sizeof(NormalisedEvent) * toPercent); // Aparently resetting the watermark causes problems... // at least concurrently :D // epicsRingBytesResetHighWaterMark(this->udpQueue); return asynSuccess; } else if (function == P_SortedQueueHighWaterMark) { const double toPercent = 100. / (243. * udpQueueSize); *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) / sizeof(NormalisedEvent) * toPercent); // epicsRingBytesResetHighWaterMark(this->sortedQueue); return asynSuccess; } return asynPortDriver::readInt32(pasynUser, value); } asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, epicsInt32 value) { int function = pasynUser->reason; asynStatus status = asynSuccess; const char *paramName; const char functionName[]{"writeInt32"}; getParamName(function, ¶mName); // TODO should maybe lock mutex for this epicsInt32 currentStatus; status = getIntegerParam(this->P_Status, ¤tStatus); if (status) { epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, "%s:%s: status=%d, function=%d, name=%s, value=%d", driverName, functionName, status, function, paramName, value); return status; } // TODO clean up bool isClearCount = false; size_t channelToClear; for (std::size_t i = 0; i < this->num_channels; ++i) { isClearCount |= function == P_ClearCounts[i]; if (isClearCount) { channelToClear = i; break; } } // TODO should check everything... if (function == P_CountPreset) { if (!currentStatus) { // slightly longer than the status update frequency // i.e. "$(INSTR)$(NAME):RAW-STATUS" SCAN seconds // to ensure that the counts have all had a chance // to update their values before starting a count epicsThreadSleep(0.12); // seconds setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); } else { return asynError; } } else if (function == P_TimePreset) { if (!currentStatus) { // slightly longer than the status update frequency // i.e. "$(INSTR)$(NAME):RAW-STATUS" SCAN seconds // to ensure that the counts have all had a chance // to update their values before starting a count epicsThreadSleep(0.12); // seconds setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); } else { return asynError; } } else if (function == P_ClearElapsedTime) { if (!currentStatus) { setDoubleParam(P_ElapsedTime, 0); } else { return asynError; } } else if (isClearCount) { if (!currentStatus) { setInteger64Param(P_Counts[channelToClear], 0); } else { return asynError; } } else if (function == P_Reset) { // TODO should probably set back everything to defaults setIntegerParam(P_Status, STATUS_IDLE); } else if (function == P_Stop) { setIntegerParam(P_Status, STATUS_IDLE); } else if (function == P_MonitorChannel) { if (!currentStatus) { setIntegerParam(function, value); } else { return asynError; } } else if (function == P_EnableElectronics) { if (value) { setIntegerParam(function, 1); CommandHeader ch(CommandId::start); std::size_t written; pasynOctetSyncIO->write(pasynUDPUser, (char *)&ch, sizeof(ch), 1, &written); } else { setIntegerParam(function, 0); CommandHeader ch(CommandId::stop); std::size_t written; pasynOctetSyncIO->write(pasynUDPUser, (char *)&ch, sizeof(ch), 1, &written); } } else { setIntegerParam(function, value); // status = (asynStatus)callParamCallbacks(); } if (status) epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, "%s:%s: status=%d, function=%d, name=%s, value=%d", driverName, functionName, status, function, paramName, value); return status; } void asynStreamGeneratorDriver::receiveUDP() { const char functionName[]{"receiveUDP"}; // int isConnected = 1; std::size_t received; int eomReason; const std::size_t bufferSize = 1500; char buffer[bufferSize]; while (true) { // status = pasynManager->isConnected(pasynUDPUser, &isConnected); // if (!isConnected) // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: isConnected = %d\n", driverName, functionName, // isConnected); pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, 0, // timeout &received, &eomReason); if (received) { const uint16_t bufferLength = ((uint16_t *)buffer)[0]; const bool isCmdBuffer = ((uint16_t *)buffer)[1] && 0x8000; const std::size_t minDataBufferHeaderLength = 42; const std::size_t minCmdBufferHeaderLength = 20; if (received >= minDataBufferHeaderLength && received == bufferLength * 2) { epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize); } else if (received >= minCmdBufferHeaderLength && isCmdBuffer) { const CommandId cmd = static_cast(((uint16_t *)buffer)[4]); if (cmd == CommandId::start) { setIntegerParam(this->P_EnableElectronicsRBV, 1); } else if (cmd == CommandId::stop) { setIntegerParam(this->P_EnableElectronicsRBV, 0); } } else { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: invalid UDP packet of length %" PRIu64 " (cmd_id %d)\n", driverName, functionName, received, ((uint16_t *)buffer)[4]); } } } } void asynStreamGeneratorDriver::normaliseUDP() { // TODO fix time overflows // Regarding time overflow. // * the header time stamp is 3 words, i.e. 48 bits. // * it has a resolution of 100ns // * so we can cover a maximum of (2^(3*16) - 1) * 1e-7 = 28147497 seconds // * or about 325 days // * so maybe this isn't necessary to solve, as long as we restart the // electronics at least once a year... const char functionName[]{"normaliseUDP"}; // The correlation unit sends messages with a maximum size of 1500 bytes. // These messages don't have any obious start or end to synchronise // against... const std::size_t bufferSize = 1500; char buffer[bufferSize]; const std::size_t resultBufferSize = 243; NormalisedEvent resultBuffer[resultBufferSize]; // We have 10 mcpdids uint32_t lastBufferNumber[10]; for (size_t i = 0; i < 10; ++i) { lastBufferNumber[i] = 0; } epicsInt32 droppedMessages = 0; const DataHeader *header; const DetectorEvent *d_event; const MonitorEvent *m_event; NormalisedEvent ne; while (true) { if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) { epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize); header = (DataHeader *)buffer; const std::size_t total_events = (header->BufferLength - 21) / 3; if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 && lastBufferNumber[header->McpdID] != std::numeric_limits< decltype(header->BufferNumber)>::max()) { asynPrint( pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: missed packet on id: %d. Received: %d, last: %d\n", driverName, functionName, header->McpdID, header->BufferNumber, lastBufferNumber[header->McpdID]); setIntegerParam(P_UdpDropped, ++droppedMessages); } lastBufferNumber[header->McpdID] = header->BufferNumber; // TODO I think monitor and detector events aren't mixed, so we // could do the check once for (std::size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); const bool isMonitorEvent = event[5] & 0x80; if (isMonitorEvent) { m_event = (MonitorEvent *)event; ne.timestamp = header->nanosecs() + (uint64_t)m_event->nanosecs(); ne.source = 0; ne.pixelId = m_event->DataID; } else { d_event = (DetectorEvent *)event; ne.timestamp = header->nanosecs() + (uint64_t)d_event->nanosecs(); ne.source = header->McpdID; ne.pixelId = d_event->pixelId(header->McpdID); } resultBuffer[i] = ne; } epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer, total_events * sizeof(NormalisedEvent)); } else { epicsThreadSleep(0.0001); // seconds } } } struct { bool operator()(const NormalisedEvent l, const NormalisedEvent r) const { return l.timestamp < r.timestamp; } } oldestEventsFirst; inline int eventsInQueue(epicsRingBytesId id) { return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent); } void asynStreamGeneratorDriver::partialSortEvents() { // const char functionName[]{"partialSortEvents"}; // x * number of ids * max events in packet int bufferedEvents = 5 * 10 * 243; NormalisedEvent events[bufferedEvents]; int queuedEvents = 0; epicsTimeStamp lastSort = epicsTime::getCurrent(); epicsTimeStamp currentTime = lastSort; while (true) { queuedEvents = eventsInQueue(this->normalisedQueue); // in case we can't wait lastSort = epicsTime::getCurrent(); currentTime = lastSort; // wait for mininmum packet frequency or enough packets to ensure we // could potentially have at least 1 packet per mcpdid while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ll) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); queuedEvents = eventsInQueue(this->normalisedQueue); } queuedEvents = std::min(queuedEvents, bufferedEvents); if (queuedEvents) { epicsRingBytesGet(this->normalisedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); std::sort(events, events + queuedEvents, oldestEventsFirst); epicsRingBytesPut(this->sortedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); } } } inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { if (this->kafkaEnabled) { if (ne.source == 0) epicsRingBytesPut(this->monitorQueue, (char *)&ne, sizeof(NormalisedEvent)); else epicsRingBytesPut(this->detectorQueue, (char *)&ne, sizeof(NormalisedEvent)); } } void asynStreamGeneratorDriver::processEvents() { const char functionName[]{"processEvents"}; // x * number of ids * max events in packet * event size int bufferedEvents = 5 * 10 * 243; // we need a little extra space for merge sorting in int extraBufferedEvents = 1 * 10 * 243; // we have two buffers. We alternate between reading data into one of them, // and then merge sorting into the other NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)]; NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)]; NormalisedEvent *eventsA = &eventsABuffer[0]; NormalisedEvent *eventsB = &eventsBBuffer[0]; NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents; NormalisedEvent *eventsBLastEnd = eventsBLastStart; int queuedEvents = 0; epicsTimeStamp lastProcess = epicsTime::getCurrent(); epicsTimeStamp currentTime = lastProcess; epicsInt64 counts[this->num_channels]; double elapsedSeconds = 0; uint64_t startTimestamp = std::numeric_limits::max(); epicsInt32 currStatus = STATUS_IDLE; epicsInt32 prevStatus = STATUS_IDLE; epicsInt32 countPreset; epicsInt32 timePreset; epicsInt32 presetChannel; while (true) { queuedEvents = eventsInQueue(this->sortedQueue); // in case we can't wait lastProcess = epicsTime::getCurrent(); currentTime = lastProcess; // wait for mininmum packet frequency or enough packets to ensure we // could potentially have at least 1 packet per mcpdid // IMPORTANT: if you start counts faster than this poll period (so // either the time or number of events incoming below), you will // find that it doesn't have time to switch back to idle within // this loop and so will just keep counting. If you need counts // that are started more often than the below currently 250ms // then that value will need to be adjusted. while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ll) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); queuedEvents = eventsInQueue(this->sortedQueue); } getIntegerParam(this->P_Status, &currStatus); queuedEvents = std::min(queuedEvents, bufferedEvents); NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents; // We read into the array, such that we have enough space, that the // entirety of the leftover from the previous read can fit before this // new read, in the case that all new events are newer timewise, and // therefore, all events from eventsB have to be placed in a preceeding // position. epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent)); std::size_t toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; // TODO could also consider an in-place merge eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents, eventsBLastStart, eventsBLastEnd, eventsA, oldestEventsFirst); eventsBLastStart = eventsA + toProcess; // TODO I haven't really taken care of the case that there are no events if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) { getIntegerParam(this->P_CountPreset, &countPreset); getIntegerParam(this->P_TimePreset, &timePreset); getIntegerParam(this->P_MonitorChannel, &presetChannel); // Parameter is base 1, here we need base 0 --presetChannel; // reset status variables startTimestamp = eventsA[0].timestamp; elapsedSeconds = 0; for (std::size_t i = 0; i < this->num_channels; ++i) { counts[i] = 0; } asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: starting count: (%d, %d, %d, %" PRIu64 ")\n", driverName, functionName, countPreset, timePreset, presetChannel, startTimestamp); } if (currStatus == STATUS_COUNTING) { for (std::size_t i = 0; i < toProcess; ++i) { counts[eventsA[i].source == 0 ? eventsA[i].pixelId : this->num_channels - 1] += 1; elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; const bool reachedCount = countPreset && counts[presetChannel] >= countPreset; const bool reachedTime = timePreset && elapsedSeconds > (double)timePreset; if (reachedCount || reachedTime) { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: reached preset: (%lld, %f)\n", driverName, functionName, counts[presetChannel], elapsedSeconds); // TODO should really check there an no more events with the // same final timestamp if (counts[presetChannel] == countPreset) this->queueForKafka(eventsA[i]); elapsedSeconds = timePreset == 0 ? elapsedSeconds : timePreset; break; } // TODO also batchwise? this->queueForKafka(eventsA[i]); } for (std::size_t i = 0; i < num_channels; ++i) { setInteger64Param(P_Counts[i], counts[i]); } setDoubleParam(P_ElapsedTime, elapsedSeconds); if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedSeconds >= (double)timePreset)) { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: finished count\n", driverName, functionName); setIntegerParam(this->P_Status, STATUS_IDLE); setIntegerParam(this->P_CountPreset, 0); setIntegerParam(this->P_TimePreset, 0); } } prevStatus = currStatus; std::swap(eventsA, eventsB); } } void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer, const char *topic, const char *source) { flatbuffers::FlatBufferBuilder builder(1024); const std::size_t bufferSize = this->kafkaMaxPacketSize + 16; std::vector tof; tof.reserve(bufferSize); std::vector did; did.reserve(bufferSize); epicsTimeStamp last_sent = epicsTime::getCurrent(); epicsTimeStamp now = last_sent; int total = 0; uint64_t message_id = 0; NormalisedEvent ne; while (true) { if (!epicsRingBytesIsEmpty(eventQueue)) { ++total; epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent)); tof.push_back(ne.timestamp); did.push_back(ne.pixelId); } else { epicsThreadSleep(0.001); // seconds } now = epicsTime::getCurrent(); // At least every 0.2 seconds if (total >= this->kafkaMaxPacketSize || epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) { last_sent = epicsTime::getCurrent(); if (total) { total = 0; builder.Clear(); auto message = CreateEventMessageDirect( builder, source, message_id++, ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + ((uint64_t)now.nsec), &tof, &did); builder.Finish(message, "ev42"); rd_kafka_resp_err_t err = rd_kafka_producev( kafkaProducer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()), // RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); if (err) { epicsStdoutPrintf("Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err)); } rd_kafka_poll(kafkaProducer, 0); tof.clear(); did.clear(); } } } } void asynStreamGeneratorDriver::produceMonitor() { this->produce(monitorQueue, monitorProducer, monitorTopic.c_str(), "monitor"); } void asynStreamGeneratorDriver::produceDetector() { this->produce(detectorQueue, detectorProducer, detectorTopic.c_str(), "detector"); } /******************************************************************************* * Methods exposed to IOC Shell */ extern "C" { asynStatus asynStreamGeneratorDriverConfigure( const char *portName, const char *ipPortName, const int numChannels, const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic, const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize) { new asynStreamGeneratorDriver(portName, ipPortName, numChannels, udpQueueSize, kafkaBroker[0], kafkaBroker, monitorTopic, detectorTopic, kafkaQueueSize, kafkaMaxPacketSize); return asynSuccess; } static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt}; static const iocshArg initArg4 = {"kafkaBroker", iocshArgString}; static const iocshArg initArg5 = {"monitorTopic", iocshArgString}; static const iocshArg initArg6 = {"detectorTopic", iocshArgString}; static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt}; static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt}; static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, &initArg3, &initArg4, &initArg5, &initArg6, &initArg7, &initArg8}; static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs}; static void initCallFunc(const iocshArgBuf *args) { asynStreamGeneratorDriverConfigure( args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval, args[5].sval, args[6].sval, args[7].ival, args[8].ival); } void asynStreamGeneratorDriverRegister(void) { iocshRegister(&initFuncDef, initCallFunc); } epicsExportRegistrar(asynStreamGeneratorDriverRegister); }