no pointers, just bytes buffers of fixed size

This commit is contained in:
2025-11-05 09:25:01 +01:00
parent 70c04af034
commit e5cb019143
2 changed files with 86 additions and 146 deletions

View File

@@ -107,8 +107,11 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
0), /* Default stack size*/
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
udpQueue(udpQueueSize, false), monitorQueue(kafkaQueueSize, false),
detectorQueue(kafkaQueueSize, false),
udpQueue(epicsRingBytesCreate(udpQueueSize * sizeof(NormalisedEvent))),
monitorQueue(
epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))),
detectorQueue(
epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))),
kafkaMaxPacketSize(kafkaMaxPacketSize) {
const char *functionName = "asynStreamGeneratorDriver";
@@ -366,6 +369,8 @@ void asynStreamGeneratorDriver::receiveUDP() {
lastBufferNumber[i] = 0;
}
NormalisedEvent ne;
while (true) {
status = pasynManager->isConnected(pasynUDPUser, &isConnected);
@@ -404,26 +409,25 @@ void asynStreamGeneratorDriver::receiveUDP() {
for (std::size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
NormalisedEvent *ne;
if (event[5] & 0x80) { // Monitor Event
MonitorEvent *m_event = (MonitorEvent *)event;
// needs to be freed!!!
ne = new NormalisedEvent(
header->nanosecs() + (uint64_t)m_event->nanosecs(),
0, m_event->DataID);
ne.timestamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
ne.source = 0;
ne.pixelId = m_event->DataID;
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
// needs to be freed!!!
ne = new NormalisedEvent(
header->nanosecs() + (uint64_t)d_event->nanosecs(),
header->McpdID, d_event->pixelId(header->McpdID));
ne.timestamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
ne.source = header->McpdID;
ne.pixelId = d_event->pixelId(header->McpdID);
}
this->udpQueue.push(ne);
epicsRingBytesPut(this->udpQueue, (char *)&ne,
sizeof(NormalisedEvent));
}
} else {
@@ -435,15 +439,14 @@ void asynStreamGeneratorDriver::receiveUDP() {
}
}
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent *ne) {
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &&ne) {
if (this->kafkaEnabled) {
if (ne->source == 0)
this->monitorQueue.push(ne);
if (ne.source == 0)
epicsRingBytesPut(this->monitorQueue, (char *)&ne,
sizeof(NormalisedEvent));
else
this->detectorQueue.push(ne);
} else {
delete ne;
epicsRingBytesPut(this->detectorQueue, (char *)&ne,
sizeof(NormalisedEvent));
}
}
@@ -451,21 +454,22 @@ void asynStreamGeneratorDriver::processEvents() {
const char *functionName = "processEvents";
const size_t queueBufferSize = 10 * this->udpQueue.getSize();
const size_t queueBufferSize =
10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent);
struct {
bool operator()(const NormalisedEvent *l,
const NormalisedEvent *r) const {
return l->timestamp > r->timestamp;
bool operator()(const NormalisedEvent l,
const NormalisedEvent r) const {
return l.timestamp > r.timestamp;
}
} smallestToLargest;
// This should never be used. It is just instantiated to reserve a buffer
// of specific size.
std::vector<NormalisedEvent *> queueBuffer;
std::vector<NormalisedEvent> queueBuffer;
queueBuffer.reserve(queueBufferSize);
std::priority_queue<NormalisedEvent *, std::vector<NormalisedEvent *>,
std::priority_queue<NormalisedEvent, std::vector<NormalisedEvent>,
decltype(smallestToLargest)>
timeQueue(smallestToLargest, std::move(queueBuffer));
@@ -480,7 +484,7 @@ void asynStreamGeneratorDriver::processEvents() {
epicsTimeStamp lastRateUpdate = epicsTime::getCurrent();
asynStatus status = asynSuccess;
NormalisedEvent *ne;
NormalisedEvent ne;
uint64_t newestTimestamp = 0;
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
uint64_t currTimestamp;
@@ -493,22 +497,24 @@ void asynStreamGeneratorDriver::processEvents() {
while (true) {
if ((ne = this->udpQueue.pop()) != nullptr) {
// TODO depending on how this is implemented, I may also need to check
// that there is is enough bytes, in case it does partial writes...
if (epicsRingBytesGet(udpQueue, (char *)&ne, sizeof(NormalisedEvent))) {
// we should reastart this ioc at least every few years, as at ns
// resolution with a uint64_t we will have an overflow after around
// 4 years
newestTimestamp = std::max(newestTimestamp, ne->timestamp);
newestTimestamp = std::max(newestTimestamp, ne.timestamp);
++countDiff[ne->source == 0 ? ne->pixelId + 1 : 0];
++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0];
timeQueue.push(ne);
timeQueue.push(std::move(ne));
}
// idea is to try and guarantee at least 1 packet per id or the min
// frequency for each id without actually checking all ids
if (timeQueue.size() >= 1500 * 10 ||
(timeQueue.size() > 0 &&
newestTimestamp - timeQueue.top()->timestamp >= 200'000'000ull)) {
newestTimestamp - timeQueue.top().timestamp >= 200'000'000ull)) {
ne = timeQueue.top();
timeQueue.pop();
@@ -546,8 +552,8 @@ void asynStreamGeneratorDriver::processEvents() {
prevStatus = currStatus;
if (currStatus == STATUS_COUNTING) {
startTimestamp = std::min(startTimestamp, ne->timestamp);
currTimestamp = ne->timestamp;
startTimestamp = std::min(startTimestamp, ne.timestamp);
currTimestamp = ne.timestamp;
elapsedSeconds =
0 ? currTimestamp <= startTimestamp
: ((double)(currTimestamp - startTimestamp)) / 1e9;
@@ -557,22 +563,20 @@ void asynStreamGeneratorDriver::processEvents() {
(timePreset && elapsedSeconds >= timePreset)) {
// filter out events that occured after the specified time
if (ne->timestamp - startTimestamp <= countPreset) {
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
this->queueForKafka(ne);
if (ne.timestamp - startTimestamp <= countPreset) {
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
this->queueForKafka(std::move(ne));
// add any remaining events with the same timestamp
// we could theoretically have a small overrun if the
// timestamps are identical on the monitor channel
while (!timeQueue.empty() &&
!timeQueue.top()->timestamp == currTimestamp) {
!timeQueue.top().timestamp == currTimestamp) {
ne = timeQueue.top();
timeQueue.pop();
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
this->queueForKafka(ne);
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
this->queueForKafka(std::move(ne));
}
} else {
delete ne;
}
countPreset = 0;
@@ -592,8 +596,8 @@ void asynStreamGeneratorDriver::processEvents() {
} else {
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
this->queueForKafka(ne);
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
this->queueForKafka(std::move(ne));
lock();
for (size_t i = 0; i < num_channels; ++i) {
@@ -603,9 +607,6 @@ void asynStreamGeneratorDriver::processEvents() {
callParamCallbacks();
unlock();
}
} else {
delete ne;
}
}
@@ -622,7 +623,8 @@ void asynStreamGeneratorDriver::processEvents() {
for (size_t j = 0; j <= 10; ++j) {
cnt += countDiffs[i * 10 + j];
}
rates[i] = cnt / 10.;
rates[i] =
cnt; // would / 10 to average than * 10 as want per second
countDiff[i] = 0;
}
@@ -641,38 +643,41 @@ void asynStreamGeneratorDriver::processEvents() {
}
}
void asynStreamGeneratorDriver::produceMonitor() {
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
rd_kafka_t *kafkaProducer,
const char *topic, const char *source) {
flatbuffers::FlatBufferBuilder builder(1024);
const std::size_t bufferSize = this->kafkaMaxPacketSize + 16;
std::vector<uint32_t> tof;
tof.reserve(this->kafkaMaxPacketSize + 16);
tof.reserve(bufferSize);
std::vector<uint32_t> did;
did.reserve(this->kafkaMaxPacketSize + 16);
did.reserve(bufferSize);
int total = 0;
epicsTimeStamp last_sent = epicsTime::getCurrent();
epicsTimeStamp now = last_sent;
int total = 0;
uint64_t message_id = 0;
NormalisedEvent ne;
while (true) {
if (!this->monitorQueue.isEmpty()) {
if (!epicsRingBytesIsEmpty(eventQueue)) {
++total;
auto nme = this->monitorQueue.pop();
tof.push_back(nme->timestamp);
did.push_back(nme->pixelId);
delete nme;
epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent));
tof.push_back(ne.timestamp);
did.push_back(ne.pixelId);
} else {
epicsThreadSleep(0.001); // seconds
}
// TODO can probably just replace the current
// instead of always getting new object
epicsTimeStamp now = epicsTime::getCurrent();
now = epicsTime::getCurrent();
// At least every 0.2 seconds
if (total >= this->kafkaMaxPacketSize ||
@@ -685,7 +690,7 @@ void asynStreamGeneratorDriver::produceMonitor() {
builder.Clear();
auto message = CreateEventMessageDirect(
builder, "monitor", message_id++,
builder, source, message_id++,
((uint64_t)now.secPastEpoch) * 1'000'000'000ull +
((uint64_t)now.nsec),
&tof, &did);
@@ -693,7 +698,7 @@ void asynStreamGeneratorDriver::produceMonitor() {
builder.Finish(message, "ev42");
rd_kafka_resp_err_t err = rd_kafka_producev(
monitorProducer, RD_KAFKA_V_TOPIC(this->monitorTopic),
kafkaProducer, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
@@ -703,11 +708,10 @@ void asynStreamGeneratorDriver::produceMonitor() {
if (err) {
epicsStdoutPrintf("Failed to produce to topic %s: %s\n",
this->monitorTopic,
rd_kafka_err2str(err));
topic, rd_kafka_err2str(err));
}
rd_kafka_poll(monitorProducer, 0);
rd_kafka_poll(kafkaProducer, 0);
tof.clear();
did.clear();
@@ -716,80 +720,12 @@ void asynStreamGeneratorDriver::produceMonitor() {
}
}
void asynStreamGeneratorDriver::produceMonitor() {
this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor");
}
void asynStreamGeneratorDriver::produceDetector() {
static const std::size_t bufferSize = this->kafkaMaxPacketSize + 16;
flatbuffers::FlatBufferBuilder builder(1024);
std::vector<uint32_t> tof;
tof.reserve(bufferSize);
std::vector<uint32_t> did;
did.reserve(bufferSize);
int total = 0;
epicsTimeStamp last_sent = epicsTime::getCurrent();
uint64_t message_id = 0;
while (true) {
if (!this->detectorQueue.isEmpty()) {
++total;
auto nde = this->detectorQueue.pop();
tof.push_back(nde->timestamp);
did.push_back(nde->pixelId);
delete nde;
} else {
// TODO
// rd_kafka_flush(detectorProducer, 10 * 1000);
epicsThreadSleep(0.001); // seconds
}
epicsTimeStamp now = epicsTime::getCurrent();
// At least every 0.2 seconds
if (total >= this->kafkaMaxPacketSize ||
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
last_sent = epicsTime::getCurrent();
if (total) {
total = 0;
builder.Clear();
auto message = CreateEventMessageDirect(
builder, "detector", message_id++,
((uint64_t)now.secPastEpoch) * 1'000'000'000ull +
((uint64_t)now.nsec),
&tof, &did);
builder.Finish(message, "ev42");
rd_kafka_resp_err_t err = rd_kafka_producev(
detectorProducer, RD_KAFKA_V_TOPIC(this->detectorTopic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err) {
epicsStdoutPrintf("Failed to produce to topic %s: %s\n",
this->detectorTopic,
rd_kafka_err2str(err));
}
rd_kafka_poll(detectorProducer, 0);
tof.clear();
did.clear();
}
}
}
this->produce(detectorQueue, detectorProducer, detectorTopic, "detector");
}
/*******************************************************************************