From c2ca5f699c6ee7a19ffd5465327901962f0a14d9 Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Thu, 30 Oct 2025 11:51:16 +0100 Subject: [PATCH] progress with parsing and kafka --- db/channels.db | 3 +- scripts/st.cmd | 5 +- scripts/udp_gen.py | 74 ++++++++++++ src/asynStreamGeneratorDriver.cpp | 186 +++++++++++++++++++++++++----- src/asynStreamGeneratorDriver.h | 6 +- 5 files changed, 241 insertions(+), 33 deletions(-) create mode 100644 scripts/udp_gen.py diff --git a/db/channels.db b/db/channels.db index 19aed39..ef0175e 100644 --- a/db/channels.db +++ b/db/channels.db @@ -14,6 +14,7 @@ record(longin, "$(INSTR)$(NAME):M$(CHANNEL)") field(DESC, "DAQ CH$(CHANNEL)") field(EGU, "cts") field(DTYP, "asynInt32") - field(INP, "@asyn($(PORT),0,$(TIMEOUT=1))COUNTS") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)") field(SCAN, "I/O Intr") + field(PINI, "YES") } diff --git a/scripts/st.cmd b/scripts/st.cmd index 31dce8e..80ea698 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -8,9 +8,12 @@ require StreamGenerator, test epicsEnvSet("INSTR", "SQ:TEST:") epicsEnvSet("NAME", "SG") -drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:9073 UDP", 0, 0, 0) +drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 0) asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4) dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=0") +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1") +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2") +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3") iocInit() diff --git a/scripts/udp_gen.py b/scripts/udp_gen.py new file mode 100644 index 0000000..428e2ad --- /dev/null +++ b/scripts/udp_gen.py @@ -0,0 +1,74 @@ +import socket +import time +import random + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + +header = [ + 0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256 + 0, 0x80, # buffer type (probably should be 0) + 21, 0, # header length + 0, 0, # buffer number + 0, 0, # run id + 0x3, # status + 0, # id of sending module + 0, 0, # timestamp low + 0, 0, # timestamp mid + 0, 0, # timestamp high +] + [0, 0] * 12 # parameters + +data = [ + 0, + 0, + 0, + 0, + 0, + 0 +] + +start_time = time.time_ns() // 100 + +while True: + # update buffer number + header[6] = (header[6] + 1) % 0xff + header[7] = (header[7] + (header[6] == 0)) % 0xff + + # update timestamp + base_timestamp = time.time_ns() // 100 - start_time + t_low = base_timestamp & 0xffff + t_mid = (base_timestamp >> 16) & 0xffff + t_high = (base_timestamp >> 32) & 0xffff + header[12] = t_low & 0xff + header[13] = t_low >> 8 + header[14] = t_mid & 0xff + header[15] = t_mid >> 8 + header[16] = t_high & 0xff + header[17] = t_high >> 8 + + num_events = random.randint(0, 243) + + # update buffer length + buffer_length = 21 + num_events * 3 + header[0] = buffer_length & 0xff + header[1] = (buffer_length >> 8) & 0xff + + tosend = list(header) + + for i in range(num_events): + d = list(data) + + # set monitor + d[5] = (1 << 7) | random.randint(0,3) + + # update trigger timestamp + event_timestamp = (time.time_ns() // 100) - base_timestamp + d[0] = event_timestamp & 0xff + d[1] = (event_timestamp >> 8) & 0xff + d[2] = (event_timestamp >> 16) & 0x07 + + tosend += d + + sock.sendto(bytes(tosend), ('127.0.0.1', 54321)) + mv = memoryview(bytes(header)).cast('H') + print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}') + time.sleep(1) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 0870399..7987a94 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -1,8 +1,13 @@ #include "asynOctetSyncIO.h" +#include #include #include #include +// Just for printing +#define __STDC_FORMAT_MACROS +#include + #include "asynStreamGeneratorDriver.h" #include @@ -25,15 +30,57 @@ static void udpPollerTask(void *drvPvt) { pSGD->receiveUDP(); } +// UDP Packet Definitions +struct __attribute__((__packed__)) UDPHeader { + uint16_t BufferLength; + uint16_t BufferType; + uint16_t HeaderLength; + uint16_t BufferNumber; + uint16_t RunCmdID; + uint16_t Status : 8; + uint16_t McpdID : 8; + uint16_t TimeStamp[3]; + uint16_t Parameter0[3]; + uint16_t Parameter1[3]; + uint16_t Parameter2[3]; + uint16_t Parameter3[3]; + + inline uint64_t nanosecs() { + uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 | + ((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]}; + return nsec * 100; + } +}; + +struct __attribute__((__packed__)) DetectorEvent { + uint64_t TimeStamp : 19; + uint16_t XPosition : 10; + uint16_t YPosition : 10; + uint16_t Amplitude : 8; + uint16_t Id : 1; + inline uint32_t nanosecs() { return TimeStamp * 100; } +}; + +struct __attribute__((__packed__)) MonitorEvent { + uint64_t TimeStamp : 19; + uint64_t Data : 21; + uint64_t DataID : 4; + uint64_t TriggerID : 3; + uint64_t Id : 1; + inline uint32_t nanosecs() { return TimeStamp * 100; } +}; + /** Constructor for the asynStreamGeneratorDriver class. * Calls constructor for the asynPortDriver base class. * \param[in] portName The name of the asyn port driver to be created. */ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, const char *ipPortName, const int numChannels) - : asynPortDriver(portName, 1, /* maxAddr */ - asynInt32Mask, /* Interface mask */ - asynInt32Mask, /* Interrupt mask */ + : asynPortDriver(portName, 1, /* maxAddr */ + // 5, + asynInt32Mask | asynInt64Mask | + asynDrvUserMask, /* Interface mask */ + asynInt32Mask | asynInt64Mask, /* Interrupt mask */ 0, /* asynFlags. This driver does not block and it is not multi-device, but has a destructor ASYN_DESTRUCTIBLE our version of the Asyn @@ -42,20 +89,30 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, 0, /* Default priority */ 0) /* Default stack size*/ { + this->num_channels = numChannels; // Parameter Setup - createParam(P_CountsString, asynParamInt32, &P_Counts); - setIntegerParam(P_Counts, 0); + char pv_name_buffer[100]; + P_Counts = new int[numChannels]; + + asynStatus status; + + for (size_t i = 0; i < numChannels; ++i) { + memset(pv_name_buffer, 0, 100); + epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); + status = createParam(pv_name_buffer, asynParamInt32, P_Counts + i); + setIntegerParam(P_Counts[i], 0); + printf("%s %d %d %d\n", pv_name_buffer, P_Counts[i], i, status); + } // UDP Receive Setup pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); /* Create the thread that receives UDP traffic in the background */ - asynStatus status = - (asynStatus)(epicsThreadCreate( - "udp_receive", epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackMedium), - (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); + status = (asynStatus)(epicsThreadCreate( + "udp_receive", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, // functionName, status); @@ -107,19 +164,32 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } -asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {} +asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { delete[] P_Counts; } asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) { + // asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser, + // epicsInt64 *value) { + const char *paramName; int function = pasynUser->reason; asynStatus status; - if (function == P_Counts) { - int val; - status = getIntegerParam(P_Counts, &val); - *value = val; - return status; + // TODO not freed + getParamName(function, ¶mName); + + bool is_p_counts = false; + for (size_t i = 0; i < num_channels; ++i) { + is_p_counts = is_p_counts | function == P_Counts[i]; + } + + if (is_p_counts) { + status = getIntegerParam(function, value); + + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: function %d %s %d\n", + "StreamGenerator", "readInt64", function, paramName, status); + // return status; + return asynSuccess; } else { return asynError; } @@ -134,9 +204,18 @@ void asynStreamGeneratorDriver::receiveUDP() { size_t received; int eomReason; - int val; + epicsInt32 val; + + // TODO epics doesn't seem to support uint64, you would need an array of + // uint32. It does support int64 though.. so we start with that + epicsInt32 *monitor_counts = new epicsInt32[this->num_channels]; while (true) { + // memset doesn't work with epicsInt32 + for (size_t i = 0; i < this->num_channels; ++i) { + monitor_counts[i] = 0; + } + // epicsStdoutPrintf("polling!!"); status = pasynManager->isConnected(pasynUDPUser, &isConnected); if (status) { @@ -153,7 +232,7 @@ void asynStreamGeneratorDriver::receiveUDP() { "StreamGenerator", "receiveUDP", isConnected); status = pasynOctetSyncIO->read(pasynUDPUser, buffer, 1500, - 1, // timeout + 0, // timeout &received, &eomReason); // if (status) @@ -162,20 +241,69 @@ void asynStreamGeneratorDriver::receiveUDP() { // "%s:%s: error calling pasynOctetSyncIO->read, status=%d\n", // "StreamGenerator", "receiveUDP", status); - buffer[received] = 0; + // buffer[received] = 0; - if (received) - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %s\n", - "StreamGenerator", "receiveUDP", buffer); + if (received) { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %d\n", + "StreamGenerator", "receiveUDP", received); - lock(); - getIntegerParam(P_Counts, &val); - val += received > 0; - setIntegerParam(P_Counts, val); - callParamCallbacks(); - unlock(); + UDPHeader *header = (UDPHeader *)buffer; - epicsThreadSleep(0.001); // seconds + size_t total_events = (header->BufferLength - 21) / 3; + + // TODO lots of checks and validation missing everywhere here + if (received == total_events * 6 + 42) { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: received packet %d with %d events (%" PRIu64 + ")\n", + "StreamGenerator", "receiveUDP", header->BufferNumber, + total_events, header->nanosecs()); + + for (size_t i = 0; i < total_events; ++i) { + char *event = (buffer + 21 * 2 + i * 6); + + if (event[5] & 0x80) { // Monitor Event + MonitorEvent *m_event = (MonitorEvent *)event; + + // asynPrint( + // pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: event (%03d) on monitor %d (%" PRIu64 + // ")\n", "StreamGenerator", "receiveUDP", i, + // m_event->DataID, header->nanosecs() + + // (uint64_t)m_event->nanosecs()); + + monitor_counts[m_event->DataID] += 1; + } else { // Detector Event + DetectorEvent *d_event = (DetectorEvent *)event; + } + } + + for (size_t i = 0; i < num_channels; ++i) { + getIntegerParam(P_Counts[i], &val); + monitor_counts[i] += val; + } + + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: monitor 0: (%d), monitor 1: (%d), monitor 2: " + "(%d), monitor 3: (%d)\n", + "StreamGenerator", "receiveUDP", monitor_counts[0], + monitor_counts[1], monitor_counts[2], + monitor_counts[3]); + + lock(); + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], monitor_counts[i]); + } + callParamCallbacks(); + unlock(); + } else { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: invalid UDP packet\n", "StreamGenerator", + "receiveUDP"); + } + } + + epicsThreadSleep(1); // seconds } } diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 28bcab7..780cde7 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -4,7 +4,7 @@ #include "asynPortDriver.h" /* These are the drvInfo strings that are used to identify the parameters. */ -#define P_CountsString "COUNTS" /* asynInt32, r/w */ +#define P_CountsString "COUNTS%d" /* asynInt32, r/w */ class asynStreamGeneratorDriver : public asynPortDriver { public: @@ -12,15 +12,17 @@ class asynStreamGeneratorDriver : public asynPortDriver { const int numChannels); virtual ~asynStreamGeneratorDriver(); + // virtual asynStatus readInt64(asynUser *pasynUser, epicsInt64 *value); virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); void receiveUDP(); protected: - int P_Counts; + int *P_Counts; private: asynUser *pasynUDPUser; + int num_channels; }; #endif