can receive both monitor and detector udp events and send them to different kafka topics

This commit is contained in:
2025-10-30 16:48:33 +01:00
parent 4c1741bd4b
commit 750436732c
4 changed files with 306 additions and 156 deletions

View File

@@ -35,6 +35,11 @@ static void monitorProducerTask(void *drvPvt) {
pSGD->produceMonitor();
}
static void detectorProducerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->produceDetector();
}
// UDP Packet Definitions
struct __attribute__((__packed__)) UDPHeader {
uint16_t BufferLength;
@@ -93,14 +98,15 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
1, /* Autoconnect */
0, /* Default priority */
0), /* Default stack size*/
num_channels(numChannels), monitorQueue(1000, false) {
num_channels(numChannels + 1), monitorQueue(1000, false),
detectorQueue(1000, false) {
// Parameter Setup
char pv_name_buffer[100];
P_Counts = new int[numChannels];
P_Counts = new int[this->num_channels];
asynStatus status;
for (size_t i = 0; i < numChannels; ++i) {
for (size_t i = 0; i < this->num_channels; ++i) {
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
status = createParam(pv_name_buffer, asynParamInt32, P_Counts + i);
@@ -115,7 +121,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
set_config(conf, "bootstrap.servers", "linkafka01:9092");
set_config(conf, "queue.buffering.max.messages", "1e7");
// Create the Producer instance.
// Create the Monitor Producer instance.
this->monitorProducer =
rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!this->monitorProducer) {
@@ -124,6 +130,19 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
exit(1);
}
conf = rd_kafka_conf_new();
set_config(conf, "bootstrap.servers", "linkafka01:9092");
set_config(conf, "queue.buffering.max.messages", "1e7");
// Create the Detector Producer instance.
this->detectorProducer =
rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!this->detectorProducer) {
// TODO
// g_error("Failed to create new producer: %s", errstr);
exit(1);
}
// Setup for Thread Producing Monitor Kafka Events
status =
(asynStatus)(epicsThreadCreate(
@@ -138,6 +157,20 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
return;
}
// Setup for Thread Producing Detector Kafka Events
status = (asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::detectorProducerTask,
this) == NULL);
if (status) {
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
// functionName, status);
printf("%s:%s: epicsThreadCreate failure, status=%d\n",
"StreamGenerator", "init", status);
return;
}
// UDP Receive Setup
pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
@@ -154,143 +187,51 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
return;
}
// // Kafka Produce Setup
// rd_kafka_conf_t *conf;
// char errstr[512];
// // Create client configuration
// conf = rd_kafka_conf_new();
// set_config(conf, "bootstrap.servers", "linkafka01:9092");
// set_config(conf, "queue.buffering.max.messages", "1e7");
// // Create the Producer instance.
// rd_kafka_t *producer =
// rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
// if (!producer) {
// // TODO
// // g_error("Failed to create new producer: %s", errstr);
// exit(1);
// }
// char *msg = "asdf\n";
// // EventMessageBuilder b;
// // We could I believe reuse a buffer which might be more performant.
// flatbuffers::FlatBufferBuilder builder(1024);
// // clear with build.Clear();
// std::vector<uint32_t> tof = {1, 2, 3};
// std::vector<uint32_t> did = {0, 0, 0};
// auto message =
// CreateEventMessageDirect(builder, "monitor1", 0, 0, &tof, &did);
// builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
// rd_kafka_resp_err_t err = rd_kafka_producev(
// producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
// RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// // RD_KAFKA_V_KEY((void *)key, key_len),
// RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
// builder.GetSize()),
// // RD_KAFKA_V_OPAQUE(NULL),
// RD_KAFKA_V_END);
// if (err) {
// // TODO
// // g_error("Failed to produce to topic %s: %s", topic,
// // rd_kafka_err2str(err));
// exit(1);
// }
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
// TODO add exit should perhaps ensure the queue is flushed
// rd_kafka_poll(producer, 0);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
// rd_kafka_flush(producer, 10 * 1000);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
}
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { delete[] P_Counts; }
// TODO pretty sure I don't actually need to overwrite this
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
epicsInt32 *value) {
// asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser,
// epicsInt64 *value) {
const char *paramName;
int function = pasynUser->reason;
asynStatus status;
// TODO not freed
getParamName(function, &paramName);
bool is_p_counts = false;
for (size_t i = 0; i < num_channels; ++i) {
is_p_counts = is_p_counts | function == P_Counts[i];
}
if (is_p_counts) {
status = getIntegerParam(function, value);
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: function %d %s %d\n",
"StreamGenerator", "readInt64", function, paramName, status);
// return status;
return asynSuccess;
} else {
return asynError;
}
return asynSuccess;
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
// should make sure queues are empty and freed
// and that the kafka producers are flushed and freed
delete[] P_Counts;
}
void asynStreamGeneratorDriver::produceMonitor() {
flatbuffers::FlatBufferBuilder builder(1024);
while (true) {
if (!this->monitorQueue.isEmpty()) {
builder.Clear();
auto nme = this->monitorQueue.pop();
std::vector<uint32_t> tof = {nme->TimeStamp};
std::vector<uint32_t> did = {nme->DataID};
auto message =
CreateEventMessageDirect(builder, "monitor", 0, 0, &tof, &did);
builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
rd_kafka_resp_err_t err = rd_kafka_producev(
monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err) {
// TODO
// g_error("Failed to produce to topic %s: %s", topic,
// rd_kafka_err2str(err));
}
// epicsStdoutPrintf("Kafka Queue Size %d\n",
// rd_kafka_outq_len(monitorProducer));
rd_kafka_poll(monitorProducer, 0);
printf("Monitor Events Queued %d\n", this->monitorQueue.getHighWaterMark());
this->monitorQueue.resetHighWaterMark();
delete nme;
}
epicsThreadSleep(0.001); // seconds
}
}
// // TODO pretty sure I don't actually need to overwrite this
// asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
// epicsInt32 *value) {
// // asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser,
// // epicsInt64 *value) {
//
// const char *paramName;
// int function = pasynUser->reason;
// asynStatus status;
//
// // TODO not freed
// getParamName(function, &paramName);
//
// bool is_p_counts = false;
// for (size_t i = 0; i < num_channels; ++i) {
// is_p_counts = is_p_counts | function == P_Counts[i];
// }
//
// if (is_p_counts) {
// status = getIntegerParam(function, value);
//
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: function %d %s
// %d\n",
// "StreamGenerator", "readInt64", function, paramName,
// status);
// // return status;
// return asynSuccess;
// } else {
// return asynError;
// }
// return asynSuccess;
// }
void asynStreamGeneratorDriver::receiveUDP() {
asynStatus status;
@@ -302,14 +243,17 @@ void asynStreamGeneratorDriver::receiveUDP() {
epicsInt32 val;
const uint32_t x_pixels = 128;
const uint32_t y_pixels = 128;
// TODO epics doesn't seem to support uint64, you would need an array of
// uint32. It does support int64 though.. so we start with that
epicsInt32 *monitor_counts = new epicsInt32[this->num_channels];
epicsInt32 *counts = new epicsInt32[this->num_channels];
while (true) {
// memset doesn't work with epicsInt32
for (size_t i = 0; i < this->num_channels; ++i) {
monitor_counts[i] = 0;
counts[i] = 0;
}
// epicsStdoutPrintf("polling!!");
@@ -368,9 +312,9 @@ void asynStreamGeneratorDriver::receiveUDP() {
// m_event->DataID, header->nanosecs() +
// (uint64_t)m_event->nanosecs());
monitor_counts[m_event->DataID] += 1;
counts[m_event->DataID + 1] += 1;
// TODO needs to be freed
// needs to be freed!!!
auto nme = new NormalisedMonitorEvent();
nme->TimeStamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
@@ -379,24 +323,34 @@ void asynStreamGeneratorDriver::receiveUDP() {
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
counts[0] += 1;
// needs to be freed!!!
auto nde = new NormalisedDetectorEvent();
nde->TimeStamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
nde->PixID =
(header->McpdID - 1) * x_pixels * y_pixels +
x_pixels * (uint32_t)d_event->XPosition +
(uint32_t)d_event->YPosition;
this->detectorQueue.push(nde);
}
}
for (size_t i = 0; i < num_channels; ++i) {
getIntegerParam(P_Counts[i], &val);
monitor_counts[i] += val;
counts[i] += val;
}
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: monitor 0: (%d), monitor 1: (%d), monitor 2: "
"(%d), monitor 3: (%d)\n",
"StreamGenerator", "receiveUDP", monitor_counts[0],
monitor_counts[1], monitor_counts[2],
monitor_counts[3]);
"%s:%s: det: (%d), mon0: (%d), mon1: (%d), mon2: "
"(%d), mon3: (%d)\n",
"StreamGenerator", "receiveUDP", counts[0], counts[1],
counts[2], counts[3], counts[4]);
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], monitor_counts[i]);
setIntegerParam(P_Counts[i], counts[i]);
}
callParamCallbacks();
unlock();
@@ -411,6 +365,161 @@ void asynStreamGeneratorDriver::receiveUDP() {
}
}
void asynStreamGeneratorDriver::produceMonitor() {
flatbuffers::FlatBufferBuilder builder(1024);
std::vector<uint32_t> tof;
tof.reserve(9000);
std::vector<uint32_t> did;
did.reserve(9000);
int total = 0;
epicsTimeStamp last_sent = epicsTime::getCurrent();
uint64_t message_id = 0;
while (true) {
if (!this->monitorQueue.isEmpty()) {
++total;
auto nme = this->monitorQueue.pop();
tof.push_back(nme->TimeStamp);
did.push_back(nme->DataID);
delete nme;
} else {
epicsThreadSleep(0.001); // seconds
}
epicsTimeStamp now = epicsTime::getCurrent();
// At least every 0.2 seconds
if (total >= 8192 ||
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
last_sent = epicsTime::getCurrent();
if (total) {
total = 0;
builder.Clear();
auto message = CreateEventMessageDirect(
builder, "monitor", message_id++, 0, &tof, &did);
builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
rd_kafka_resp_err_t err = rd_kafka_producev(
monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err) {
// TODO
// g_error("Failed to produce to topic %s: %s", topic,
// rd_kafka_err2str(err));
}
// epicsStdoutPrintf("Kafka Queue Size %d\n",
// rd_kafka_outq_len(monitorProducer));
rd_kafka_poll(monitorProducer, 0);
printf("Monitor Events Queued before sending %d\n",
this->monitorQueue.getHighWaterMark());
this->monitorQueue.resetHighWaterMark();
tof.clear();
did.clear();
}
}
}
}
void asynStreamGeneratorDriver::produceDetector() {
flatbuffers::FlatBufferBuilder builder(1024);
std::vector<uint32_t> tof;
tof.reserve(9000);
std::vector<uint32_t> did;
did.reserve(9000);
int total = 0;
epicsTimeStamp last_sent = epicsTime::getCurrent();
uint64_t message_id = 0;
while (true) {
if (!this->detectorQueue.isEmpty()) {
++total;
auto nde = this->detectorQueue.pop();
tof.push_back(nde->TimeStamp);
did.push_back(nde->PixID);
delete nde;
} else {
epicsThreadSleep(0.001); // seconds
}
epicsTimeStamp now = epicsTime::getCurrent();
// At least every 0.2 seconds
if (total >= 8192 ||
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
last_sent = epicsTime::getCurrent();
if (total) {
total = 0;
builder.Clear();
auto message = CreateEventMessageDirect(
builder, "detector", message_id++, 0, &tof, &did);
builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
rd_kafka_resp_err_t err = rd_kafka_producev(
detectorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST2"),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err) {
// TODO
// g_error("Failed to produce to topic %s: %s", topic,
// rd_kafka_err2str(err));
}
// epicsStdoutPrintf("Kafka Queue Size %d\n",
// rd_kafka_outq_len(monitorProducer));
rd_kafka_poll(detectorProducer, 0);
printf("Detector Events Queued before sending %d\n",
this->detectorQueue.getHighWaterMark());
this->detectorQueue.resetHighWaterMark();
tof.clear();
did.clear();
}
}
}
}
/* Configuration routine. Called directly, or from the iocsh function below */
extern "C" {