working on correcting the ordering of the messages

This commit is contained in:
2025-11-03 17:31:16 +01:00
parent e65725609c
commit 81bd3bef7f
4 changed files with 160 additions and 150 deletions

View File

@@ -57,6 +57,11 @@ static void udpPollerTask(void *drvPvt) {
pSGD->receiveUDP();
}
static void daqTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->processEvents();
}
static void monitorProducerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->produceMonitor();
@@ -83,7 +88,8 @@ asynStatus asynStreamGeneratorDriver::createInt32Param(
*/
asynStreamGeneratorDriver::asynStreamGeneratorDriver(
const char *portName, const char *ipPortName, const int numChannels,
const int kafkaQueueSize, const int kafkaMaxPacketSize)
const int udpQueueSize, const int kafkaQueueSize,
const int kafkaMaxPacketSize)
: asynPortDriver(portName, 1, /* maxAddr */
asynInt32Mask | asynInt64Mask |
asynDrvUserMask, /* Interface mask */
@@ -95,8 +101,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
1, /* Autoconnect */
0, /* Default priority */
0), /* Default stack size*/
num_channels(numChannels + 1), monitorQueue(kafkaQueueSize, false),
detectorQueue(kafkaQueueSize, false),
num_channels(numChannels + 1), udpQueue(udpQueueSize, false),
monitorQueue(kafkaQueueSize, false), detectorQueue(kafkaQueueSize, false),
kafkaMaxPacketSize(kafkaMaxPacketSize) {
const char *functionName = "asynStreamGeneratorDriver";
@@ -122,7 +128,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
P_Counts = new int[this->num_channels];
P_Rates = new int[this->num_channels];
P_ClearCounts = new int[this->num_channels];
for (size_t i = 0; i < this->num_channels; ++i) {
for (std::size_t i = 0; i < this->num_channels; ++i) {
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
status = createInt32Param(status, pv_name_buffer, P_Counts + i);
@@ -145,27 +151,42 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
// Create Events
this->pausedEventId = epicsEventCreate(epicsEventEmpty);
this->monitorProducer = create_kafka_producer();
this->detectorProducer = create_kafka_producer();
// TODO re-enable the kafka stuff
// this->monitorProducer = create_kafka_producer();
// this->detectorProducer = create_kafka_producer();
// Setup for Thread Producing Monitor Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::monitorProducerTask, this) == NULL);
if (status) {
printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
functionName, status);
exit(1);
}
// // Setup for Thread Producing Monitor Kafka Events
// status =
// (asynStatus)(epicsThreadCreate(
// "monitor_produce", epicsThreadPriorityMedium,
// epicsThreadGetStackSize(epicsThreadStackMedium),
// (EPICSTHREADFUNC)::monitorProducerTask, this) ==
// NULL);
// if (status) {
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
// functionName, status);
// exit(1);
// }
// Setup for Thread Producing Detector Kafka Events
// // Setup for Thread Producing Detector Kafka Events
// status = (asynStatus)(epicsThreadCreate(
// "monitor_produce", epicsThreadPriorityMedium,
// epicsThreadGetStackSize(epicsThreadStackMedium),
// (EPICSTHREADFUNC)::detectorProducerTask,
// this) == NULL);
// if (status) {
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
// functionName, status);
// exit(1);
// }
// TODO re-enable the kafka stuff
/* Create the thread that orders the events and acts as our sinqDaq stand-in
*/
status = (asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
"sinqDAQ", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::detectorProducerTask,
this) == NULL);
(EPICSTHREADFUNC)::daqTask, this) == NULL);
if (status) {
printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
functionName, status);
@@ -258,165 +279,134 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
return status;
}
// TODO probably I will have to split this function up, so that the system
// can process the UDP messages in parallel
void asynStreamGeneratorDriver::receiveUDP() {
asynStatus status;
int isConnected;
const size_t buffer_size = 1500;
char buffer[buffer_size];
size_t received;
int eomReason;
epicsInt32 val;
epicsInt32 currentStatus;
epicsInt32 countPreset = 0;
epicsInt32 timePreset = 0;
epicsInt32 presetChannel = 0;
const char *functionName = "receiveUDP";
asynStatus status = asynSuccess;
int isConnected = 1;
std::size_t received;
int eomReason;
// TODO epics doesn't seem to support uint64, you would need an array of
// uint32. It does support int64 though.. so we start with that
epicsInt32 *counts = new epicsInt32[this->num_channels];
uint64_t start_time = std::numeric_limits<uint64_t>::max();
uint64_t current_time = 0;
epicsInt32 elapsedTime = 0;
// The correlation unit sents messages with a maximum size of 1500 bytes.
// These messages don't have any obious start or end to synchronise
// against...
const std::size_t bufferSize = 1500;
char buffer[bufferSize + 1]; // so that \0 can fit
while (true) {
status = getIntegerParam(this->P_Status, &currentStatus);
if (!currentStatus || status) {
epicsEventWait(this->pausedEventId);
getIntegerParam(this->P_CountPreset, &countPreset);
getIntegerParam(this->P_TimePreset, &timePreset);
getIntegerParam(this->P_MonitorChannel, &presetChannel);
// memset doesn't work with epicsInt32
for (size_t i = 0; i < this->num_channels; ++i) {
counts[i] = 0;
}
start_time = std::numeric_limits<uint64_t>::max();
current_time = 0;
elapsedTime = 0;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, 0);
callParamCallbacks();
unlock();
// Clear the input buffer, in case of stray messages
pasynOctetSyncIO->flush(pasynUDPUser);
}
status = pasynManager->isConnected(pasynUDPUser, &isConnected);
if (!isConnected)
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: isConnected = %d\n", driverName, functionName,
isConnected);
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, buffer_size,
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
0, // timeout
&received, &eomReason);
if (received) {
UDPHeader *header = (UDPHeader *)buffer;
size_t total_events = (header->BufferLength - 21) / 3;
std::size_t total_events = (header->BufferLength - 21) / 3;
start_time =
std::min(start_time, (uint64_t)(header->nanosecs() / 1e9));
// This is maybe safer, in case the time wraps back around?
// if (start_time == std::numeric_limits<uint64_t>::max())
// start_time = header->nanosecs() /1e9;
// TODO lots of checks and validation missing everywhere here
if (received == total_events * 6 + 42) {
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: received packet %d with %d events (%"
// PRIu64
// ")\n",
// driverName, functionName,
// header->BufferNumber, total_events,
// header->nanosecs());
for (size_t i = 0; i < total_events; ++i) {
for (std::size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
if (countPreset && counts[presetChannel] >= countPreset)
break;
NormalisedEvent *ne;
if (event[5] & 0x80) { // Monitor Event
MonitorEvent *m_event = (MonitorEvent *)event;
counts[m_event->DataID + 1] += 1;
// needs to be freed!!!
auto nme = new NormalisedMonitorEvent();
nme->TimeStamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
nme->DataID = m_event->DataID;
this->monitorQueue.push(nme);
current_time = std::max(
current_time,
(uint64_t)((header->nanosecs() +
(uint64_t)m_event->nanosecs()) /
1e9));
ne = new NormalisedEvent(
header->nanosecs() + (uint64_t)m_event->nanosecs(),
0, m_event->DataID);
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
counts[0] += 1;
// needs to be freed!!!
auto nde = new NormalisedDetectorEvent();
nde->TimeStamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
nde->PixID = d_event->pixelId(header->McpdID);
this->detectorQueue.push(nde);
current_time = std::max(
current_time,
(uint64_t)((header->nanosecs() +
(uint64_t)d_event->nanosecs()) /
1e9));
ne = new NormalisedEvent(
header->nanosecs() + (uint64_t)d_event->nanosecs(),
header->McpdID, d_event->pixelId(header->McpdID));
}
this->udpQueue.push(ne);
}
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
elapsedTime = current_time - start_time;
setIntegerParam(P_ElapsedTime, elapsedTime);
callParamCallbacks();
unlock();
} else {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: invalid UDP packet\n", driverName,
functionName);
}
}
}
}
if ((countPreset && counts[presetChannel] >= countPreset) ||
(timePreset && elapsedTime >= timePreset)) {
lock();
setIntegerParam(P_Status, STATUS_IDLE);
setIntegerParam(P_CountPreset, 0);
setIntegerParam(P_TimePreset, 0);
callParamCallbacks();
unlock();
}
void asynStreamGeneratorDriver::processEvents() {
const char *functionName = "processEvents";
const size_t queueBufferSize = 10 * this->udpQueue.getSize();
struct {
bool operator()(const NormalisedEvent *l,
const NormalisedEvent *r) const {
return l->timestamp > r->timestamp;
}
} smallestToLargest;
// This should never be used. It is just instantiated to reserve a buffer
// of specific size.
std::vector<NormalisedEvent *> queueBuffer;
queueBuffer.reserve(queueBufferSize);
std::priority_queue<NormalisedEvent *, std::vector<NormalisedEvent *>,
decltype(smallestToLargest)>
timeQueue(smallestToLargest, std::move(queueBuffer));
NormalisedEvent *ne;
uint64_t newest = 0;
// TODO epics doesn't seem to support uint64, you would need an array of
// uint32. It does support int64 though.. so we start with that
epicsInt32 *counts = new epicsInt32[this->num_channels];
while (true) {
if ((ne = this->udpQueue.pop()) != nullptr) {
// TODO overflow in the correlation unit?
newest = std::max(newest, ne->timestamp);
timeQueue.push(ne);
}
// epicsThreadSleep(1); // seconds
// idea is to try and guarantee at least 1 packet per id or the min
// frequency for each id without actually checking all ids
if (timeQueue.size() >= 1500 * 10 ||
(timeQueue.size() > 0 &&
newest - timeQueue.top()->timestamp >= 200'000'000ull)) {
ne = timeQueue.top();
timeQueue.pop();
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
delete ne;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
// elapsedTime = current_time - start_time;
// setIntegerParam(P_ElapsedTime, elapsedTime);
callParamCallbacks();
unlock();
}
}
}
@@ -497,7 +487,7 @@ void asynStreamGeneratorDriver::produceMonitor() {
void asynStreamGeneratorDriver::produceDetector() {
static const size_t bufferSize = this->kafkaMaxPacketSize + 16;
static const std::size_t bufferSize = this->kafkaMaxPacketSize + 16;
flatbuffers::FlatBufferBuilder builder(1024);
std::vector<uint32_t> tof;
@@ -612,24 +602,28 @@ extern "C" {
asynStatus asynStreamGeneratorDriverConfigure(const char *portName,
const char *ipPortName,
const int numChannels,
const int udpQueueSize,
const int kafkaQueueSize,
const int kafkaMaxPacketSize) {
new asynStreamGeneratorDriver(portName, ipPortName, numChannels,
kafkaQueueSize, kafkaMaxPacketSize);
udpQueueSize, kafkaQueueSize,
kafkaMaxPacketSize);
return asynSuccess;
}
static const iocshArg initArg0 = {"portName", iocshArgString};
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
static const iocshArg initArg3 = {"kafkaQueueSize", iocshArgInt};
static const iocshArg initArg4 = {"kafkaMaxPacketSize", iocshArgInt};
static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt};
static const iocshArg initArg4 = {"kafkaQueueSize", iocshArgInt};
static const iocshArg initArg5 = {"kafkaMaxPacketSize", iocshArgInt};
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2,
&initArg3, &initArg4};
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 5, initArgs};
&initArg3, &initArg4, &initArg5};
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 6, initArgs};
static void initCallFunc(const iocshArgBuf *args) {
asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival,
args[3].ival, args[4].ival);
args[3].ival, args[4].ival,
args[5].ival);
}
void asynStreamGeneratorDriverRegister(void) {