progress with parsing and kafka

This commit is contained in:
2025-10-30 11:51:16 +01:00
parent b810509156
commit c2ca5f699c
5 changed files with 241 additions and 33 deletions

View File

@@ -1,8 +1,13 @@
#include "asynOctetSyncIO.h"
#include <cstring>
#include <epicsStdio.h>
#include <iocsh.h>
#include <librdkafka/rdkafka.h>
// Just for printing
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "asynStreamGeneratorDriver.h"
#include <epicsExport.h>
@@ -25,15 +30,57 @@ static void udpPollerTask(void *drvPvt) {
pSGD->receiveUDP();
}
// UDP Packet Definitions
struct __attribute__((__packed__)) UDPHeader {
uint16_t BufferLength;
uint16_t BufferType;
uint16_t HeaderLength;
uint16_t BufferNumber;
uint16_t RunCmdID;
uint16_t Status : 8;
uint16_t McpdID : 8;
uint16_t TimeStamp[3];
uint16_t Parameter0[3];
uint16_t Parameter1[3];
uint16_t Parameter2[3];
uint16_t Parameter3[3];
inline uint64_t nanosecs() {
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
return nsec * 100;
}
};
struct __attribute__((__packed__)) DetectorEvent {
uint64_t TimeStamp : 19;
uint16_t XPosition : 10;
uint16_t YPosition : 10;
uint16_t Amplitude : 8;
uint16_t Id : 1;
inline uint32_t nanosecs() { return TimeStamp * 100; }
};
struct __attribute__((__packed__)) MonitorEvent {
uint64_t TimeStamp : 19;
uint64_t Data : 21;
uint64_t DataID : 4;
uint64_t TriggerID : 3;
uint64_t Id : 1;
inline uint32_t nanosecs() { return TimeStamp * 100; }
};
/** Constructor for the asynStreamGeneratorDriver class.
* Calls constructor for the asynPortDriver base class.
* \param[in] portName The name of the asyn port driver to be created. */
asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
const char *ipPortName,
const int numChannels)
: asynPortDriver(portName, 1, /* maxAddr */
asynInt32Mask, /* Interface mask */
asynInt32Mask, /* Interrupt mask */
: asynPortDriver(portName, 1, /* maxAddr */
// 5,
asynInt32Mask | asynInt64Mask |
asynDrvUserMask, /* Interface mask */
asynInt32Mask | asynInt64Mask, /* Interrupt mask */
0, /* asynFlags. This driver does not block and it is
not multi-device, but has a
destructor ASYN_DESTRUCTIBLE our version of the Asyn
@@ -42,20 +89,30 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
0, /* Default priority */
0) /* Default stack size*/
{
this->num_channels = numChannels;
// Parameter Setup
createParam(P_CountsString, asynParamInt32, &P_Counts);
setIntegerParam(P_Counts, 0);
char pv_name_buffer[100];
P_Counts = new int[numChannels];
asynStatus status;
for (size_t i = 0; i < numChannels; ++i) {
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
status = createParam(pv_name_buffer, asynParamInt32, P_Counts + i);
setIntegerParam(P_Counts[i], 0);
printf("%s %d %d %d\n", pv_name_buffer, P_Counts[i], i, status);
}
// UDP Receive Setup
pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
/* Create the thread that receives UDP traffic in the background */
asynStatus status =
(asynStatus)(epicsThreadCreate(
"udp_receive", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
status = (asynStatus)(epicsThreadCreate(
"udp_receive", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
if (status) {
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
// functionName, status);
@@ -107,19 +164,32 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
}
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {}
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { delete[] P_Counts; }
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
epicsInt32 *value) {
// asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser,
// epicsInt64 *value) {
const char *paramName;
int function = pasynUser->reason;
asynStatus status;
if (function == P_Counts) {
int val;
status = getIntegerParam(P_Counts, &val);
*value = val;
return status;
// TODO not freed
getParamName(function, &paramName);
bool is_p_counts = false;
for (size_t i = 0; i < num_channels; ++i) {
is_p_counts = is_p_counts | function == P_Counts[i];
}
if (is_p_counts) {
status = getIntegerParam(function, value);
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: function %d %s %d\n",
"StreamGenerator", "readInt64", function, paramName, status);
// return status;
return asynSuccess;
} else {
return asynError;
}
@@ -134,9 +204,18 @@ void asynStreamGeneratorDriver::receiveUDP() {
size_t received;
int eomReason;
int val;
epicsInt32 val;
// TODO epics doesn't seem to support uint64, you would need an array of
// uint32. It does support int64 though.. so we start with that
epicsInt32 *monitor_counts = new epicsInt32[this->num_channels];
while (true) {
// memset doesn't work with epicsInt32
for (size_t i = 0; i < this->num_channels; ++i) {
monitor_counts[i] = 0;
}
// epicsStdoutPrintf("polling!!");
status = pasynManager->isConnected(pasynUDPUser, &isConnected);
if (status) {
@@ -153,7 +232,7 @@ void asynStreamGeneratorDriver::receiveUDP() {
"StreamGenerator", "receiveUDP", isConnected);
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, 1500,
1, // timeout
0, // timeout
&received, &eomReason);
// if (status)
@@ -162,20 +241,69 @@ void asynStreamGeneratorDriver::receiveUDP() {
// "%s:%s: error calling pasynOctetSyncIO->read, status=%d\n",
// "StreamGenerator", "receiveUDP", status);
buffer[received] = 0;
// buffer[received] = 0;
if (received)
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %s\n",
"StreamGenerator", "receiveUDP", buffer);
if (received) {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %d\n",
"StreamGenerator", "receiveUDP", received);
lock();
getIntegerParam(P_Counts, &val);
val += received > 0;
setIntegerParam(P_Counts, val);
callParamCallbacks();
unlock();
UDPHeader *header = (UDPHeader *)buffer;
epicsThreadSleep(0.001); // seconds
size_t total_events = (header->BufferLength - 21) / 3;
// TODO lots of checks and validation missing everywhere here
if (received == total_events * 6 + 42) {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: received packet %d with %d events (%" PRIu64
")\n",
"StreamGenerator", "receiveUDP", header->BufferNumber,
total_events, header->nanosecs());
for (size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
if (event[5] & 0x80) { // Monitor Event
MonitorEvent *m_event = (MonitorEvent *)event;
// asynPrint(
// pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: event (%03d) on monitor %d (%" PRIu64
// ")\n", "StreamGenerator", "receiveUDP", i,
// m_event->DataID, header->nanosecs() +
// (uint64_t)m_event->nanosecs());
monitor_counts[m_event->DataID] += 1;
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
}
}
for (size_t i = 0; i < num_channels; ++i) {
getIntegerParam(P_Counts[i], &val);
monitor_counts[i] += val;
}
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: monitor 0: (%d), monitor 1: (%d), monitor 2: "
"(%d), monitor 3: (%d)\n",
"StreamGenerator", "receiveUDP", monitor_counts[0],
monitor_counts[1], monitor_counts[2],
monitor_counts[3]);
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], monitor_counts[i]);
}
callParamCallbacks();
unlock();
} else {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: invalid UDP packet\n", "StreamGenerator",
"receiveUDP");
}
}
epicsThreadSleep(1); // seconds
}
}