adds elapsed time and time based preset

This commit is contained in:
2025-10-31 19:10:59 +01:00
parent 1e853487aa
commit 7bacc716cc
4 changed files with 141 additions and 53 deletions

View File

@@ -3,6 +3,7 @@
#include <cstring>
#include <epicsStdio.h>
#include <iocsh.h>
#include <queue>
// Just for printing
#define __STDC_FORMAT_MACROS
@@ -86,10 +87,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
num_channels(numChannels + 1), monitorQueue(1000, false),
detectorQueue(1000, false) {
const char *functionName = "asynStreamGeneratorDriver";
// Parameter Setup
char pv_name_buffer[100];
P_Counts = new int[this->num_channels];
// Parameter Setup
asynStatus status = asynSuccess;
status = (asynStatus)(status | createParam(P_StatusString, asynParamInt32,
@@ -104,12 +103,23 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
asynParamInt32, &P_CountPreset));
status = (asynStatus)(status | setIntegerParam(P_CountPreset, 0));
status = (asynStatus)(status | createParam(P_TimePresetString,
asynParamInt32, &P_TimePreset));
status = (asynStatus)(status | setIntegerParam(P_TimePreset, 0));
status = (asynStatus)(status | createParam(P_ElapsedTimeString,
asynParamInt32, &P_ElapsedTime));
status = (asynStatus)(status | setIntegerParam(P_ElapsedTime, 0));
status =
(asynStatus)(status | createParam(P_MonitorChannelString,
asynParamInt32, &P_MonitorChannel));
status = (asynStatus)(status | setIntegerParam(P_MonitorChannel, 0));
// Create PVs templated on Channel Number
// Create Parameters templated on Channel Number
char pv_name_buffer[100];
P_Counts = new int[this->num_channels];
P_Rates = new int[this->num_channels];
for (size_t i = 0; i < this->num_channels; ++i) {
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
@@ -117,6 +127,13 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
(asynStatus)(status | createParam(pv_name_buffer, asynParamInt32,
P_Counts + i));
status = (asynStatus)(status | setIntegerParam(P_Counts[i], 0));
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_RateString, i);
status =
(asynStatus)(status | createParam(pv_name_buffer, asynParamInt32,
P_Rates + i));
status = (asynStatus)(status | setIntegerParam(P_Rates[i], 0));
}
if (status) {
@@ -128,8 +145,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
// Create Events
this->pausedEventId = epicsEventCreate(epicsEventEmpty);
this->monitorProducer = create_kafka_producer();
this->detectorProducer = create_kafka_producer();
// this->monitorProducer = create_kafka_producer();
// this->detectorProducer = create_kafka_producer();
// Setup for Thread Producing Monitor Kafka Events
status =
@@ -180,6 +197,7 @@ asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
// should make sure queues are empty and freed
// and that the kafka producers are flushed and freed
delete[] P_Counts;
delete[] P_Rates;
// TODO add exit should perhaps ensure the queue is flushed
// rd_kafka_poll(producer, 0);
@@ -205,6 +223,13 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
// }
if (function == P_CountPreset) {
// TODO should block setting a preset when already set
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
epicsEventSignal(this->pausedEventId);
} else if (function == P_TimePreset) {
// TODO should block setting a preset when already set
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
@@ -247,7 +272,8 @@ void asynStreamGeneratorDriver::receiveUDP() {
epicsInt32 val;
epicsInt32 currentStatus;
epicsInt32 countPreset = 0;
epicsInt32 presetChannel = 1;
epicsInt32 timePreset = 0;
epicsInt32 presetChannel = 0;
const char *functionName = "receiveUDP";
@@ -255,6 +281,10 @@ void asynStreamGeneratorDriver::receiveUDP() {
// uint32. It does support int64 though.. so we start with that
epicsInt32 *counts = new epicsInt32[this->num_channels];
uint64_t start_time = std::numeric_limits<uint64_t>::max();
uint64_t current_time = 0;
epicsInt32 elapsedTime = 0;
while (true) {
status = getIntegerParam(this->P_Status, &currentStatus);
@@ -263,6 +293,7 @@ void asynStreamGeneratorDriver::receiveUDP() {
epicsEventWait(this->pausedEventId);
getIntegerParam(this->P_CountPreset, &countPreset);
getIntegerParam(this->P_TimePreset, &timePreset);
getIntegerParam(this->P_MonitorChannel, &presetChannel);
// memset doesn't work with epicsInt32
@@ -270,10 +301,15 @@ void asynStreamGeneratorDriver::receiveUDP() {
counts[i] = 0;
}
start_time = std::numeric_limits<uint64_t>::max();
current_time = 0;
elapsedTime = 0;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, 0);
callParamCallbacks();
unlock();
@@ -296,6 +332,12 @@ void asynStreamGeneratorDriver::receiveUDP() {
size_t total_events = (header->BufferLength - 21) / 3;
start_time =
std::min(start_time, (uint64_t)(header->nanosecs() / 1e9));
// This is maybe safer, in case the time wraps back around?
// if (start_time == std::numeric_limits<uint64_t>::max())
// start_time = header->nanosecs() /1e9;
// TODO lots of checks and validation missing everywhere here
if (received == total_events * 6 + 42) {
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
@@ -324,6 +366,12 @@ void asynStreamGeneratorDriver::receiveUDP() {
nme->DataID = m_event->DataID;
this->monitorQueue.push(nme);
current_time = std::max(
current_time,
(uint64_t)((header->nanosecs() +
(uint64_t)m_event->nanosecs()) /
1e9));
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
counts[0] += 1;
@@ -334,6 +382,12 @@ void asynStreamGeneratorDriver::receiveUDP() {
header->nanosecs() + (uint64_t)d_event->nanosecs();
nde->PixID = d_event->pixelId(header->McpdID);
this->detectorQueue.push(nde);
current_time = std::max(
current_time,
(uint64_t)((header->nanosecs() +
(uint64_t)d_event->nanosecs()) /
1e9));
}
}
@@ -341,6 +395,8 @@ void asynStreamGeneratorDriver::receiveUDP() {
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
elapsedTime = current_time - start_time;
setIntegerParam(P_ElapsedTime, elapsedTime);
callParamCallbacks();
unlock();
} else {
@@ -349,10 +405,11 @@ void asynStreamGeneratorDriver::receiveUDP() {
functionName);
}
if (countPreset && counts[presetChannel] >= countPreset) {
if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedTime >= timePreset)) {
lock();
setIntegerParam(P_Status, STATUS_IDLE);
setIntegerParam(P_CountPreset, 0);
setIntegerParam(P_TimePreset, 0);
callParamCallbacks();
unlock();
}
@@ -412,7 +469,6 @@ void asynStreamGeneratorDriver::produceMonitor() {
&tof, &did);
builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
rd_kafka_resp_err_t err = rd_kafka_producev(
monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
@@ -429,15 +485,8 @@ void asynStreamGeneratorDriver::produceMonitor() {
// rd_kafka_err2str(err));
}
// epicsStdoutPrintf("Kafka Queue Size %d\n",
// rd_kafka_outq_len(monitorProducer));
rd_kafka_poll(monitorProducer, 0);
// printf("Monitor Events Queued before sending %d\n",
// this->monitorQueue.getHighWaterMark());
// this->monitorQueue.resetHighWaterMark();
tof.clear();
did.clear();
}
@@ -447,19 +496,37 @@ void asynStreamGeneratorDriver::produceMonitor() {
void asynStreamGeneratorDriver::produceDetector() {
static const size_t bufferSize = 9000;
flatbuffers::FlatBufferBuilder builder(1024);
std::vector<uint32_t> tof;
tof.reserve(9000);
tof.reserve(bufferSize);
std::vector<uint32_t> did;
did.reserve(9000);
did.reserve(bufferSize);
int total = 0;
epicsTimeStamp last_sent = epicsTime::getCurrent();
uint64_t message_id = 0;
struct {
bool operator()(const uint64_t l, const uint64_t r) const {
return l > r;
}
} smallestToLargest;
// This should never be used. It is just instantiated to reserve a buffer
// of specific size.
std::vector<uint64_t> queueBuffer;
queueBuffer.reserve(bufferSize);
std::priority_queue<uint64_t, std::vector<uint64_t>,
decltype(smallestToLargest)>
timeQueue(smallestToLargest, std::move(queueBuffer));
uint64_t newest = 0;
while (true) {
if (!this->detectorQueue.isEmpty()) {
@@ -468,11 +535,30 @@ void asynStreamGeneratorDriver::produceDetector() {
auto nde = this->detectorQueue.pop();
tof.push_back(nde->TimeStamp);
did.push_back(nde->PixID);
newest = std::max(newest, nde->TimeStamp);
timeQueue.push(nde->TimeStamp);
delete nde;
} else {
epicsThreadSleep(0.001); // seconds
}
while (!timeQueue.empty() &&
(timeQueue.size() >= 8192 ||
(newest - timeQueue.top()) > 5'000'000'000ull))
timeQueue.pop();
epicsInt32 rate = 0;
if (timeQueue.size() > 1) {
rate = ((double)timeQueue.size() /
((double)(newest - timeQueue.top()) * 1e-9));
}
lock();
setIntegerParam(P_Rates[0], rate);
callParamCallbacks();
unlock();
epicsTimeStamp now = epicsTime::getCurrent();
// At least every 0.2 seconds
@@ -492,7 +578,6 @@ void asynStreamGeneratorDriver::produceDetector() {
&tof, &did);
builder.Finish(message, "ev42");
// printf("buffer size: %d\n", builder.GetSize());
rd_kafka_resp_err_t err = rd_kafka_producev(
detectorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST2"),
@@ -509,15 +594,8 @@ void asynStreamGeneratorDriver::produceDetector() {
// rd_kafka_err2str(err));
}
// epicsStdoutPrintf("Kafka Queue Size %d\n",
// rd_kafka_outq_len(monitorProducer));
rd_kafka_poll(detectorProducer, 0);
// printf("Detector Events Queued before sending %d\n",
// this->detectorQueue.getHighWaterMark());
// this->detectorQueue.resetHighWaterMark();
tof.clear();
did.clear();
}