#include "asynOctetSyncIO.h" #include "ev42_events_generated.h" #include #include #include // Just for printing #define __STDC_FORMAT_MACROS #include #include "asynStreamGeneratorDriver.h" #include /******************************************************************************* * Kafka Methods */ static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key, char *value) { char errstr[512]; rd_kafka_conf_res_t res; res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value); exit(1); } } static rd_kafka_t *create_kafka_producer() { char errstr[512]; rd_kafka_t *producer; // Prepare configuration object rd_kafka_conf_t *conf = rd_kafka_conf_new(); set_kafka_config_key(conf, "bootstrap.servers", "linkafka01:9092"); set_kafka_config_key(conf, "queue.buffering.max.messages", "1e7"); // Create the Producer producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!producer) { epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr); exit(1); } return producer; } /******************************************************************************* * Static Methods Passed to Epics Threads that should run in the background */ static void udpPollerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->receiveUDP(); } static void monitorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceMonitor(); } static void detectorProducerTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->produceDetector(); } /******************************************************************************* * Stream Generator Methods */ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, const char *ipPortName, const int numChannels) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ asynInt32Mask | asynInt64Mask, /* Interrupt mask */ 0, /* asynFlags. This driver does not block and it is not multi-device, but has a destructor ASYN_DESTRUCTIBLE our version of the Asyn is too old to support this flag */ 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ num_channels(numChannels + 1), monitorQueue(1000, false), detectorQueue(1000, false) { const char *functionName = "asynStreamGeneratorDriver"; // Parameter Setup char pv_name_buffer[100]; P_Counts = new int[this->num_channels]; asynStatus status; // Create PVs templated on Channel Number for (size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); status = createParam(pv_name_buffer, asynParamInt32, P_Counts + i); setIntegerParam(P_Counts[i], 0); } this->monitorProducer = create_kafka_producer(); this->detectorProducer = create_kafka_producer(); // Setup for Thread Producing Monitor Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::monitorProducerTask, this) == NULL); if (status) { printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // Setup for Thread Producing Detector Kafka Events status = (asynStatus)(epicsThreadCreate( "monitor_produce", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::detectorProducerTask, this) == NULL); if (status) { printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } // UDP Receive Setup status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); if (status) { printf("%s:%s: Couldn't open connection %s, status=%d\n", driverName, functionName, ipPortName, status); exit(1); } /* Create the thread that receives UDP traffic in the background */ status = (asynStatus)(epicsThreadCreate( "udp_receive", epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, functionName, status); exit(1); } } asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { // should make sure queues are empty and freed // and that the kafka producers are flushed and freed delete[] P_Counts; // TODO add exit should perhaps ensure the queue is flushed // rd_kafka_poll(producer, 0); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); // rd_kafka_flush(producer, 10 * 1000); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } void asynStreamGeneratorDriver::receiveUDP() { asynStatus status; int isConnected; const size_t buffer_size = 1500; char buffer[buffer_size]; size_t received; int eomReason; epicsInt32 val; const uint32_t x_pixels = 128; const uint32_t y_pixels = 128; // TODO epics doesn't seem to support uint64, you would need an array of // uint32. It does support int64 though.. so we start with that epicsInt32 *counts = new epicsInt32[this->num_channels]; while (true) { // memset doesn't work with epicsInt32 for (size_t i = 0; i < this->num_channels; ++i) { counts[i] = 0; } status = pasynManager->isConnected(pasynUDPUser, &isConnected); if (status) { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: error calling pasynManager->isConnected, " "status=%d, error=%s\n", driverName, "receiveUDP", status, pasynUDPUser->errorMessage); // driverName, functionName, status, // pasynUserIPPort_->errorMessage); } asynPrint(pasynUserSelf, ASYN_TRACEIO_DRIVER, "%s:%s: isConnected = %d\n", // driverName, "receiveUDP", isConnected); status = pasynOctetSyncIO->read(pasynUDPUser, buffer, buffer_size, 0, // timeout &received, &eomReason); // if (status) // asynPrint( // pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: error calling pasynOctetSyncIO->read, status=%d\n", // driverName, "receiveUDP", status); // buffer[received] = 0; if (received) { // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %f %d // received\n", // driverName, "receiveUDP", (double) received / // 1500., received); // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received // %d\n", // driverName, "receiveUDP", received); UDPHeader *header = (UDPHeader *)buffer; size_t total_events = (header->BufferLength - 21) / 3; // TODO lots of checks and validation missing everywhere here if (received == total_events * 6 + 42) { // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: received packet %d with %d events (%" // PRIu64 // ")\n", // driverName, "receiveUDP", // header->BufferNumber, total_events, // header->nanosecs()); for (size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); if (event[5] & 0x80) { // Monitor Event MonitorEvent *m_event = (MonitorEvent *)event; // asynPrint( // pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: event (%03d) on monitor %d (%" PRIu64 // ")\n", driverName, "receiveUDP", i, // m_event->DataID, header->nanosecs() + // (uint64_t)m_event->nanosecs()); counts[m_event->DataID + 1] += 1; // needs to be freed!!! auto nme = new NormalisedMonitorEvent(); nme->TimeStamp = header->nanosecs() + (uint64_t)m_event->nanosecs(); nme->DataID = m_event->DataID; this->monitorQueue.push(nme); } else { // Detector Event DetectorEvent *d_event = (DetectorEvent *)event; counts[0] += 1; // needs to be freed!!! auto nde = new NormalisedDetectorEvent(); nde->TimeStamp = header->nanosecs() + (uint64_t)d_event->nanosecs(); nde->PixID = (header->McpdID - 1) * x_pixels * y_pixels + x_pixels * (uint32_t)d_event->XPosition + (uint32_t)d_event->YPosition; this->detectorQueue.push(nde); } } for (size_t i = 0; i < num_channels; ++i) { getIntegerParam(P_Counts[i], &val); counts[i] += val; } // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, // "%s:%s: det: (%d), mon0: (%d), mon1: (%d), mon2: " // "(%d), mon3: (%d)\n", // driverName, "receiveUDP", counts[0], // counts[1], counts[2], counts[3], counts[4]); lock(); for (size_t i = 0; i < num_channels; ++i) { setIntegerParam(P_Counts[i], counts[i]); } callParamCallbacks(); unlock(); } else { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: invalid UDP packet\n", driverName, "receiveUDP"); } } // epicsThreadSleep(1); // seconds } } void asynStreamGeneratorDriver::produceMonitor() { flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; tof.reserve(9000); std::vector did; did.reserve(9000); int total = 0; epicsTimeStamp last_sent = epicsTime::getCurrent(); uint64_t message_id = 0; while (true) { if (!this->monitorQueue.isEmpty()) { ++total; auto nme = this->monitorQueue.pop(); tof.push_back(nme->TimeStamp); did.push_back(nme->DataID); delete nme; } else { epicsThreadSleep(0.001); // seconds } // TODO can probably just replace the current // instead of always getting new object epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds if (total >= 8192 || epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { last_sent = epicsTime::getCurrent(); if (total) { total = 0; builder.Clear(); auto message = CreateEventMessageDirect( builder, "monitor", message_id++, ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + ((uint64_t)now.nsec), &tof, &did); builder.Finish(message, "ev42"); // printf("buffer size: %d\n", builder.GetSize()); rd_kafka_resp_err_t err = rd_kafka_producev( monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()), // RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); if (err) { // TODO // g_error("Failed to produce to topic %s: %s", topic, // rd_kafka_err2str(err)); } // epicsStdoutPrintf("Kafka Queue Size %d\n", // rd_kafka_outq_len(monitorProducer)); rd_kafka_poll(monitorProducer, 0); // printf("Monitor Events Queued before sending %d\n", // this->monitorQueue.getHighWaterMark()); // this->monitorQueue.resetHighWaterMark(); tof.clear(); did.clear(); } } } } void asynStreamGeneratorDriver::produceDetector() { flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; tof.reserve(9000); std::vector did; did.reserve(9000); int total = 0; epicsTimeStamp last_sent = epicsTime::getCurrent(); uint64_t message_id = 0; while (true) { if (!this->detectorQueue.isEmpty()) { ++total; auto nde = this->detectorQueue.pop(); tof.push_back(nde->TimeStamp); did.push_back(nde->PixID); delete nde; } else { epicsThreadSleep(0.001); // seconds } epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds if (total >= 8192 || epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { last_sent = epicsTime::getCurrent(); if (total) { total = 0; builder.Clear(); auto message = CreateEventMessageDirect( builder, "detector", message_id++, ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + ((uint64_t)now.nsec), &tof, &did); builder.Finish(message, "ev42"); // printf("buffer size: %d\n", builder.GetSize()); rd_kafka_resp_err_t err = rd_kafka_producev( detectorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST2"), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()), // RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); if (err) { // TODO // g_error("Failed to produce to topic %s: %s", topic, // rd_kafka_err2str(err)); } // epicsStdoutPrintf("Kafka Queue Size %d\n", // rd_kafka_outq_len(monitorProducer)); rd_kafka_poll(detectorProducer, 0); // printf("Detector Events Queued before sending %d\n", // this->detectorQueue.getHighWaterMark()); // this->detectorQueue.resetHighWaterMark(); tof.clear(); did.clear(); } } } } /******************************************************************************* * Methods exposed to IOC Shell */ extern "C" { asynStatus asynStreamGeneratorDriverConfigure(const char *portName, const char *ipPortName, const int numChannels) { new asynStreamGeneratorDriver(portName, ipPortName, numChannels); return asynSuccess; } static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2}; static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 3, initArgs}; static void initCallFunc(const iocshArgBuf *args) { asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival); } void asynStreamGeneratorDriverRegister(void) { iocshRegister(&initFuncDef, initCallFunc); } epicsExportRegistrar(asynStreamGeneratorDriverRegister); }