Files
StreamGenerator/src/asynStreamGeneratorDriver.h
Edward Wall a13c5b81e2
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 2s
woops, strings got deleted
2025-11-18 14:27:41 +01:00

221 lines
7.0 KiB
C++

#ifndef asynStreamGeneratorDriver_H
#define asynStreamGeneratorDriver_H
#include "asynPortDriver.h"
#include <epicsRingBytes.h>
#include <librdkafka/rdkafka.h>
// Just for printing
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
/*******************************************************************************
* UDP Packet Definitions
*/
enum CommandId : std::int16_t { reset = 0, start = 1, stop = 2, cont = 3 };
struct __attribute__((__packed__)) CommandHeader {
uint16_t BufferLength;
uint16_t BufferType;
uint16_t HeaderLength;
uint16_t BufferNumber;
uint16_t Command;
uint16_t McpdIdStatus;
uint16_t TimeStampLo;
uint16_t TimeStampMid;
uint16_t TimeStampHigh;
uint16_t Checksum;
uint16_t Finalizer;
CommandHeader(const CommandId commandId)
: BufferLength(10), BufferType(0x8000), HeaderLength(10),
BufferNumber(0), Command(commandId), McpdIdStatus(0), TimeStampLo(0),
TimeStampMid(0), TimeStampHigh(0), Checksum(0), Finalizer(0xffff) {
Checksum = BufferLength ^ BufferType ^ HeaderLength ^ BufferNumber ^
Command ^ McpdIdStatus ^ TimeStampLo ^ TimeStampMid ^
TimeStampHigh;
}
};
struct __attribute__((__packed__)) DataHeader {
uint16_t BufferLength;
uint16_t BufferType;
uint16_t HeaderLength;
uint16_t BufferNumber;
uint16_t RunCmdID;
uint16_t Status : 8;
uint16_t McpdID : 8;
uint16_t TimeStamp[3];
uint16_t Parameter0[3];
uint16_t Parameter1[3];
uint16_t Parameter2[3];
uint16_t Parameter3[3];
inline uint64_t nanosecs() const {
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
return nsec * 100;
}
};
struct __attribute__((__packed__)) DetectorEvent {
uint64_t TimeStamp : 19;
uint16_t XPosition : 10;
uint16_t YPosition : 10;
uint16_t Amplitude : 8;
uint16_t Id : 1;
inline uint32_t nanosecs() const { return TimeStamp * 100; }
inline uint64_t pixelId(uint32_t mpcdId) const {
const uint32_t x_pixels = 128;
const uint32_t y_pixels = 128;
return (mpcdId - 1) * x_pixels * y_pixels +
x_pixels * (uint32_t)this->XPosition + (uint32_t)this->YPosition;
}
};
struct __attribute__((__packed__)) MonitorEvent {
uint64_t TimeStamp : 19;
uint64_t Data : 21;
uint64_t DataID : 4;
uint64_t TriggerID : 3;
uint64_t Id : 1;
inline uint32_t nanosecs() const { return TimeStamp * 100; }
};
/*******************************************************************************
* Simplified Event Struct Definition
*/
struct __attribute__((__packed__)) NormalisedEvent {
uint64_t timestamp;
uint32_t pixelId : 24;
uint8_t source;
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
// pixelId)
// : timestamp(timestamp), source(source), pixelId(pixelId){};
};
/*******************************************************************************
* Status values that should match the definition in db/daq_common.db
*/
#define STATUS_IDLE 0
#define STATUS_COUNTING 1
#define STATUS_LOWRATE 2
#define STATUS_PAUSED 3
/*******************************************************************************
* Parameters for use in DB records
*
* i.e.e drvInfo strings that are used to identify the parameters
*/
constexpr static char P_EnableElectronicsString[]{"EN_EL"};
constexpr static char P_StatusString[]{"STATUS"};
constexpr static char P_ResetString[]{"RESET"};
constexpr static char P_StopString[]{"STOP"};
constexpr static char P_CountPresetString[]{"P_CNT"};
constexpr static char P_TimePresetString[]{"P_TIME"};
constexpr static char P_ElapsedTimeString[]{"TIME"};
constexpr static char P_ClearElapsedTimeString[]{"C_TIME"};
constexpr static char P_MonitorChannelString[]{"MONITOR"};
constexpr static char P_ThresholdString[]{"THRESH"};
constexpr static char P_ThresholdChannelString[]{"THRESH_CH"};
constexpr static char P_CountsString[]{"COUNTS%" PRIu64};
constexpr static char P_RateString[]{"RATE%" PRIu64};
constexpr static char P_ClearCountsString[]{"C_%" PRIu64};
constexpr static char P_UdpDroppedString[]{"DROP"};
constexpr static char P_UdpQueueHighWaterMarkString[]{"UDP"};
constexpr static char P_SortedQueueHighWaterMarkString[]{"SORT"};
/*******************************************************************************
* Stream Generator Coordinating Class
*/
class asynStreamGeneratorDriver : public asynPortDriver {
public:
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
const int numChannels, const int udpQueueSize,
const bool enableKafkaStream,
const char *kafkaBroker, const char *monitorTopic,
const char *detectorTopic,
const int kafkaQueueSize,
const int kafkaMaxPacketSize);
virtual ~asynStreamGeneratorDriver();
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
void receiveUDP();
void normaliseUDP();
void partialSortEvents();
void processEvents();
void produceMonitor();
void produceDetector();
protected:
// Parameter Identifying IDs
int P_EnableElectronics;
int P_Status;
int P_Reset;
int P_Stop;
int P_CountPreset;
int P_TimePreset;
int P_ElapsedTime;
int P_ClearElapsedTime;
int P_MonitorChannel;
int P_Threshold;
int P_ThresholdChannel;
int *P_Counts;
int *P_Rates;
int *P_ClearCounts;
// System Status Parameter Identifying IDs
int P_UdpDropped;
int P_UdpQueueHighWaterMark;
int P_SortedQueueHighWaterMark;
private:
asynUser *pasynUDPUser;
epicsEventId pausedEventId;
const std::size_t num_channels;
const bool kafkaEnabled;
const int kafkaQueueSize;
const int kafkaMaxPacketSize;
const int udpQueueSize;
epicsRingBytesId udpQueue;
epicsRingBytesId normalisedQueue;
epicsRingBytesId sortedQueue;
epicsRingBytesId monitorQueue;
rd_kafka_t *monitorProducer;
const std::string monitorTopic;
epicsRingBytesId detectorQueue;
rd_kafka_t *detectorProducer;
const std::string detectorTopic;
static constexpr char driverName[]{"StreamGenerator"};
asynStatus createInt32Param(asynStatus status, const char *name,
int *variable, epicsInt32 initialValue = 0);
asynStatus createInt64Param(asynStatus status, const char *name,
int *variable, epicsInt64 initialValue = 0);
asynStatus createFloat64Param(asynStatus status, const char *name,
int *variable, double initialValue = 0);
inline void queueForKafka(NormalisedEvent &ne);
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
const char *topic, const char *source);
};
#endif