again at the point that I can do preset based counts, but now with the priority queue built in so that the events are sorted

This commit is contained in:
2025-11-04 10:24:25 +01:00
parent 81bd3bef7f
commit 60aa1652c3
4 changed files with 145 additions and 59 deletions

View File

@@ -88,8 +88,8 @@ asynStatus asynStreamGeneratorDriver::createInt32Param(
*/
asynStreamGeneratorDriver::asynStreamGeneratorDriver(
const char *portName, const char *ipPortName, const int numChannels,
const int udpQueueSize, const int kafkaQueueSize,
const int kafkaMaxPacketSize)
const int udpQueueSize, const bool enableKafkaStream,
const int kafkaQueueSize, const int kafkaMaxPacketSize)
: asynPortDriver(portName, 1, /* maxAddr */
asynInt32Mask | asynInt64Mask |
asynDrvUserMask, /* Interface mask */
@@ -151,35 +151,36 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
// Create Events
this->pausedEventId = epicsEventCreate(epicsEventEmpty);
// TODO re-enable the kafka stuff
// this->monitorProducer = create_kafka_producer();
// this->detectorProducer = create_kafka_producer();
if (enableKafkaStream) {
this->monitorProducer = create_kafka_producer();
this->detectorProducer = create_kafka_producer();
// // Setup for Thread Producing Monitor Kafka Events
// status =
// (asynStatus)(epicsThreadCreate(
// "monitor_produce", epicsThreadPriorityMedium,
// epicsThreadGetStackSize(epicsThreadStackMedium),
// (EPICSTHREADFUNC)::monitorProducerTask, this) ==
// NULL);
// if (status) {
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
// functionName, status);
// exit(1);
// }
// Setup for Thread Producing Monitor Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::monitorProducerTask,
this) == NULL);
if (status) {
printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
functionName, status);
exit(1);
}
// // Setup for Thread Producing Detector Kafka Events
// status = (asynStatus)(epicsThreadCreate(
// "monitor_produce", epicsThreadPriorityMedium,
// epicsThreadGetStackSize(epicsThreadStackMedium),
// (EPICSTHREADFUNC)::detectorProducerTask,
// this) == NULL);
// if (status) {
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
// functionName, status);
// exit(1);
// }
// TODO re-enable the kafka stuff
// Setup for Thread Producing Detector Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::detectorProducerTask,
this) == NULL);
if (status) {
printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
functionName, status);
exit(1);
}
}
/* Create the thread that orders the events and acts as our sinqDaq stand-in
*/
@@ -281,6 +282,9 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
void asynStreamGeneratorDriver::receiveUDP() {
// TODO fix time overflows
// TODO check for lost packets
const char *functionName = "receiveUDP";
asynStatus status = asynSuccess;
int isConnected = 1;
@@ -370,19 +374,29 @@ void asynStreamGeneratorDriver::processEvents() {
decltype(smallestToLargest)>
timeQueue(smallestToLargest, std::move(queueBuffer));
NormalisedEvent *ne;
uint64_t newest = 0;
// TODO epics doesn't seem to support uint64, you would need an array of
// uint32. It does support int64 though.. so we start with that
epicsInt32 *counts = new epicsInt32[this->num_channels];
asynStatus status = asynSuccess;
NormalisedEvent *ne;
uint64_t newestTimestamp = 0;
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
uint64_t currTimestamp;
epicsInt32 elapsedSeconds = 0;
epicsInt32 prevStatus = STATUS_IDLE;
epicsInt32 currStatus = STATUS_IDLE;
epicsInt32 countPreset = 0;
epicsInt32 timePreset = 0;
epicsInt32 presetChannel = 0;
while (true) {
if ((ne = this->udpQueue.pop()) != nullptr) {
// TODO overflow in the correlation unit?
newest = std::max(newest, ne->timestamp);
// we should reastart this ioc at least every few years, as at ns
// resolution with a uint64_t we will have an overflow after around
// 4 years
newestTimestamp = std::max(newestTimestamp, ne->timestamp);
timeQueue.push(ne);
}
@@ -390,22 +404,93 @@ void asynStreamGeneratorDriver::processEvents() {
// frequency for each id without actually checking all ids
if (timeQueue.size() >= 1500 * 10 ||
(timeQueue.size() > 0 &&
newest - timeQueue.top()->timestamp >= 200'000'000ull)) {
newestTimestamp - timeQueue.top()->timestamp >= 200'000'000ull)) {
ne = timeQueue.top();
timeQueue.pop();
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
status = getIntegerParam(this->P_Status, &currStatus);
if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) {
// Starting a new count
// get current count configuration
getIntegerParam(this->P_CountPreset, &countPreset);
getIntegerParam(this->P_TimePreset, &timePreset);
getIntegerParam(this->P_MonitorChannel, &presetChannel);
// reset status variables
startTimestamp = std::numeric_limits<uint64_t>::max();
for (size_t i = 0; i < this->num_channels; ++i) {
counts[i] = 0;
}
// reset pvs
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, 0);
callParamCallbacks();
unlock();
// TODO might consider throwing out current buffer as it is
// from before count started? then again, 0.2 ms or whatever is
// set above is quite a small preceeding amount of time, so
// maybe it doesn't matter
}
prevStatus = currStatus;
if (currStatus == STATUS_COUNTING) {
startTimestamp = std::min(startTimestamp, ne->timestamp);
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
currTimestamp = ne->timestamp;
elapsedSeconds =
0 ? currTimestamp <= startTimestamp
: ((double)(currTimestamp - startTimestamp)) / 1e9;
}
delete ne;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
// is our count finished?
if ((countPreset && counts[presetChannel] >= countPreset) ||
(timePreset && elapsedSeconds >= timePreset)) {
// add any remaining events with the same timestamp
// we could theoretically have a small overrun if the
// timestamps are identical on the monitor channel
while (!timeQueue.empty() &&
!timeQueue.top()->timestamp == currTimestamp) {
ne = timeQueue.top();
timeQueue.pop();
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
delete ne;
}
countPreset = 0;
timePreset = 0;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
setIntegerParam(P_CountPreset, countPreset);
setIntegerParam(P_TimePreset, timePreset);
callParamCallbacks();
setIntegerParam(P_Status, STATUS_IDLE);
callParamCallbacks();
unlock();
} else if (currStatus == STATUS_COUNTING) {
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
callParamCallbacks();
unlock();
}
// elapsedTime = current_time - start_time;
// setIntegerParam(P_ElapsedTime, elapsedTime);
callParamCallbacks();
unlock();
}
}
}
@@ -599,15 +684,13 @@ void asynStreamGeneratorDriver::produceDetector() {
*/
extern "C" {
asynStatus asynStreamGeneratorDriverConfigure(const char *portName,
const char *ipPortName,
const int numChannels,
const int udpQueueSize,
const int kafkaQueueSize,
const int kafkaMaxPacketSize) {
asynStatus asynStreamGeneratorDriverConfigure(
const char *portName, const char *ipPortName, const int numChannels,
const int udpQueueSize, const bool enableKafkaStream,
const int kafkaQueueSize, const int kafkaMaxPacketSize) {
new asynStreamGeneratorDriver(portName, ipPortName, numChannels,
udpQueueSize, kafkaQueueSize,
kafkaMaxPacketSize);
udpQueueSize, enableKafkaStream,
kafkaQueueSize, kafkaMaxPacketSize);
return asynSuccess;
}
@@ -615,15 +698,17 @@ static const iocshArg initArg0 = {"portName", iocshArgString};
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt};
static const iocshArg initArg4 = {"kafkaQueueSize", iocshArgInt};
static const iocshArg initArg5 = {"kafkaMaxPacketSize", iocshArgInt};
static const iocshArg initArg4 = {"enableKafkaStream", iocshArgInt};
static const iocshArg initArg5 = {"kafkaQueueSize", iocshArgInt};
static const iocshArg initArg6 = {"kafkaMaxPacketSize", iocshArgInt};
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2,
&initArg3, &initArg4, &initArg5};
&initArg3, &initArg4, &initArg5,
&initArg6};
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 6, initArgs};
static void initCallFunc(const iocshArgBuf *args) {
asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival,
args[3].ival, args[4].ival,
args[5].ival);
args[3].ival, args[4].ival, args[5].ival,
args[6].ival);
}
void asynStreamGeneratorDriverRegister(void) {