very inefficient, but can receive udp monitor events and count them and send them as kafka events

This commit is contained in:
2025-10-30 15:07:21 +01:00
parent 09ba30025a
commit 4c1741bd4b
2 changed files with 158 additions and 54 deletions

View File

@@ -3,7 +3,6 @@
#include <cstring>
#include <epicsStdio.h>
#include <iocsh.h>
#include <librdkafka/rdkafka.h>
// Just for printing
#define __STDC_FORMAT_MACROS
@@ -31,6 +30,11 @@ static void udpPollerTask(void *drvPvt) {
pSGD->receiveUDP();
}
static void monitorProducerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->produceMonitor();
}
// UDP Packet Definitions
struct __attribute__((__packed__)) UDPHeader {
uint16_t BufferLength;
@@ -82,16 +86,14 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
asynInt32Mask | asynInt64Mask |
asynDrvUserMask, /* Interface mask */
asynInt32Mask | asynInt64Mask, /* Interrupt mask */
0, /* asynFlags. This driver does not block and it is
not multi-device, but has a
destructor ASYN_DESTRUCTIBLE our version of the Asyn
is too old to support this flag */
1, /* Autoconnect */
0, /* Default priority */
0) /* Default stack size*/
{
this->num_channels = numChannels;
0, /* asynFlags. This driver does not block and it is
not multi-device, but has a
destructor ASYN_DESTRUCTIBLE our version of the Asyn
is too old to support this flag */
1, /* Autoconnect */
0, /* Default priority */
0), /* Default stack size*/
num_channels(numChannels), monitorQueue(1000, false) {
// Parameter Setup
char pv_name_buffer[100];
P_Counts = new int[numChannels];
@@ -106,6 +108,36 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
printf("%s %d %d %d\n", pv_name_buffer, P_Counts[i], i, status);
}
char errstr[512];
// Create client configuration
rd_kafka_conf_t *conf = rd_kafka_conf_new();
set_config(conf, "bootstrap.servers", "linkafka01:9092");
set_config(conf, "queue.buffering.max.messages", "1e7");
// Create the Producer instance.
this->monitorProducer =
rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!this->monitorProducer) {
// TODO
// g_error("Failed to create new producer: %s", errstr);
exit(1);
}
// Setup for Thread Producing Monitor Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::monitorProducerTask, this) == NULL);
if (status) {
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
// functionName, status);
printf("%s:%s: epicsThreadCreate failure, status=%d\n",
"StreamGenerator", "init", status);
return;
}
// UDP Receive Setup
pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
@@ -122,61 +154,64 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
return;
}
// Kafka Produce Setup
rd_kafka_conf_t *conf;
char errstr[512];
// // Kafka Produce Setup
// rd_kafka_conf_t *conf;
// char errstr[512];
// Create client configuration
conf = rd_kafka_conf_new();
set_config(conf, "bootstrap.servers", "linkafka01:9092");
set_config(conf, "queue.buffering.max.messages", "1e7");
// // Create client configuration
// conf = rd_kafka_conf_new();
// set_config(conf, "bootstrap.servers", "linkafka01:9092");
// set_config(conf, "queue.buffering.max.messages", "1e7");
// Create the Producer instance.
rd_kafka_t *producer =
rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
// TODO
// g_error("Failed to create new producer: %s", errstr);
exit(1);
}
// // Create the Producer instance.
// rd_kafka_t *producer =
// rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
// if (!producer) {
// // TODO
// // g_error("Failed to create new producer: %s", errstr);
// exit(1);
// }
char *msg = "asdf\n";
// EventMessageBuilder b;
// We could I believe reuse a buffer which might be more performant.
flatbuffers::FlatBufferBuilder builder(1024);
std::vector<uint32_t> tof = {1, 2, 3};
std::vector<uint32_t> did = {0, 0, 0};
auto message =
CreateEventMessageDirect(builder, "monitor1", 0, 0, &tof, &did);
// char *msg = "asdf\n";
// // EventMessageBuilder b;
// // We could I believe reuse a buffer which might be more performant.
// flatbuffers::FlatBufferBuilder builder(1024);
// // clear with build.Clear();
// std::vector<uint32_t> tof = {1, 2, 3};
// std::vector<uint32_t> did = {0, 0, 0};
// auto message =
// CreateEventMessageDirect(builder, "monitor1", 0, 0, &tof, &did);
builder.Finish(message, "ev42");
printf("buffer size: %d\n", builder.GetSize());
// builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
rd_kafka_resp_err_t err = rd_kafka_producev(
producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
// rd_kafka_resp_err_t err = rd_kafka_producev(
// producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
// RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// // RD_KAFKA_V_KEY((void *)key, key_len),
// RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
// builder.GetSize()),
// // RD_KAFKA_V_OPAQUE(NULL),
// RD_KAFKA_V_END);
if (err) {
// TODO
// g_error("Failed to produce to topic %s: %s", topic,
// rd_kafka_err2str(err));
exit(1);
}
// if (err) {
// // TODO
// // g_error("Failed to produce to topic %s: %s", topic,
// // rd_kafka_err2str(err));
// exit(1);
// }
epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
rd_kafka_poll(producer, 0);
epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
rd_kafka_flush(producer, 10 * 1000);
epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
// rd_kafka_poll(producer, 0);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
// rd_kafka_flush(producer, 10 * 1000);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
}
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { delete[] P_Counts; }
// TODO pretty sure I don't actually need to overwrite this
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
epicsInt32 *value) {
// asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser,
@@ -207,6 +242,56 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
return asynSuccess;
}
void asynStreamGeneratorDriver::produceMonitor() {
flatbuffers::FlatBufferBuilder builder(1024);
while (true) {
if (!this->monitorQueue.isEmpty()) {
builder.Clear();
auto nme = this->monitorQueue.pop();
std::vector<uint32_t> tof = {nme->TimeStamp};
std::vector<uint32_t> did = {nme->DataID};
auto message =
CreateEventMessageDirect(builder, "monitor", 0, 0, &tof, &did);
builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
rd_kafka_resp_err_t err = rd_kafka_producev(
monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err) {
// TODO
// g_error("Failed to produce to topic %s: %s", topic,
// rd_kafka_err2str(err));
}
// epicsStdoutPrintf("Kafka Queue Size %d\n",
// rd_kafka_outq_len(monitorProducer));
rd_kafka_poll(monitorProducer, 0);
printf("Monitor Events Queued %d\n", this->monitorQueue.getHighWaterMark());
this->monitorQueue.resetHighWaterMark();
delete nme;
}
epicsThreadSleep(0.001); // seconds
}
}
void asynStreamGeneratorDriver::receiveUDP() {
asynStatus status;
int isConnected;
@@ -284,6 +369,14 @@ void asynStreamGeneratorDriver::receiveUDP() {
// (uint64_t)m_event->nanosecs());
monitor_counts[m_event->DataID] += 1;
// TODO needs to be freed
auto nme = new NormalisedMonitorEvent();
nme->TimeStamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
nme->DataID = m_event->DataID;
this->monitorQueue.push(nme);
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
}