finished converting the processing to a batch-wise variant
This commit is contained in:
6
Makefile
6
Makefile
@@ -20,9 +20,11 @@ TEMPLATES += db/channels.db db/daq_common.db
|
|||||||
# Source files to build
|
# Source files to build
|
||||||
SOURCES += src/asynStreamGeneratorDriver.cpp
|
SOURCES += src/asynStreamGeneratorDriver.cpp
|
||||||
|
|
||||||
USR_CFLAGS += -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
|
# I don't think specifying the optimisation level like this is correct...
|
||||||
|
# but I doesn't hurt :D
|
||||||
|
USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
|
||||||
|
|
||||||
# Required to support EV42/44
|
# Required to support EV42/44
|
||||||
USR_CXXFLAGS += -I../dep/flatbuffers/include/ -I../schemas
|
USR_CXXFLAGS += -O3 -I../dep/flatbuffers/include/ -I../schemas
|
||||||
|
|
||||||
LIB_SYS_LIBS += rdkafka
|
LIB_SYS_LIBS += rdkafka
|
||||||
|
|||||||
@@ -14,7 +14,8 @@ drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
|
|||||||
# 10'000 * 243 = 2.43e6 events
|
# 10'000 * 243 = 2.43e6 events
|
||||||
|
|
||||||
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192)
|
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192)
|
||||||
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
|
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||||
|
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
|
||||||
|
|
||||||
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
#include "asynOctetSyncIO.h"
|
#include "asynOctetSyncIO.h"
|
||||||
#include "ev42_events_generated.h"
|
#include "ev42_events_generated.h"
|
||||||
|
#include <cmath>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <epicsStdio.h>
|
#include <epicsStdio.h>
|
||||||
#include <iocsh.h>
|
#include <iocsh.h>
|
||||||
@@ -112,16 +113,16 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
|||||||
0), /* Default stack size*/
|
0), /* Default stack size*/
|
||||||
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
||||||
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
||||||
// so these first to are measured in max packet sizes
|
// measured in max packet sizes
|
||||||
udpQueue(
|
udpQueue(
|
||||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||||
// TODO configurable sizes
|
// TODO configurable sizes
|
||||||
sortedQueue(epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
sortedQueue(
|
||||||
// and these two are currently measured in event sizes...
|
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||||
monitorQueue(
|
monitorQueue(
|
||||||
epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))),
|
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||||
detectorQueue(
|
detectorQueue(
|
||||||
epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))),
|
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||||
kafkaMaxPacketSize(kafkaMaxPacketSize) {
|
kafkaMaxPacketSize(kafkaMaxPacketSize) {
|
||||||
const char *functionName = "asynStreamGeneratorDriver";
|
const char *functionName = "asynStreamGeneratorDriver";
|
||||||
|
|
||||||
@@ -237,14 +238,13 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Create the thread that orders packets of in preparation for our sinqDAQ stand-in
|
/* Create the thread that orders packets of in preparation for our sinqDAQ
|
||||||
|
* stand-in
|
||||||
*/
|
*/
|
||||||
status =
|
status = (asynStatus)(epicsThreadCreate(
|
||||||
(asynStatus)(epicsThreadCreate(
|
"partialSort", epicsThreadPriorityMedium,
|
||||||
"partialSort",
|
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||||
epicsThreadPriorityMedium,
|
(EPICSTHREADFUNC)::sortTask, this) == NULL);
|
||||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
|
||||||
(EPICSTHREADFUNC)::sortTask, this) == NULL);
|
|
||||||
if (status) {
|
if (status) {
|
||||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||||
driverName, functionName, status);
|
driverName, functionName, status);
|
||||||
@@ -285,7 +285,8 @@ asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
|
|||||||
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||||
}
|
}
|
||||||
|
|
||||||
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) {
|
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
|
||||||
|
epicsInt32 *value) {
|
||||||
|
|
||||||
int function = pasynUser->reason;
|
int function = pasynUser->reason;
|
||||||
asynStatus status = asynSuccess;
|
asynStatus status = asynSuccess;
|
||||||
@@ -294,21 +295,20 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32
|
|||||||
getParamName(function, ¶mName);
|
getParamName(function, ¶mName);
|
||||||
|
|
||||||
if (function == P_UdpQueueHighWaterMark) {
|
if (function == P_UdpQueueHighWaterMark) {
|
||||||
*value =
|
*value = epicsRingBytesHighWaterMark(this->udpQueue) /
|
||||||
epicsRingBytesHighWaterMark(this->udpQueue) / sizeof(NormalisedEvent);
|
sizeof(NormalisedEvent);
|
||||||
// Aparently resetting the watermark causes problems...
|
// Aparently resetting the watermark causes problems...
|
||||||
// at least concurrently :D
|
// at least concurrently :D
|
||||||
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
||||||
return asynSuccess;
|
return asynSuccess;
|
||||||
} else if (function == P_SortedQueueHighWaterMark) {
|
} else if (function == P_SortedQueueHighWaterMark) {
|
||||||
*value =
|
*value = epicsRingBytesHighWaterMark(this->sortedQueue) /
|
||||||
epicsRingBytesHighWaterMark(this->sortedQueue) / sizeof(NormalisedEvent);
|
sizeof(NormalisedEvent);
|
||||||
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
||||||
return asynSuccess;
|
return asynSuccess;
|
||||||
}
|
}
|
||||||
|
|
||||||
return asynPortDriver::readInt32(pasynUser, value);
|
return asynPortDriver::readInt32(pasynUser, value);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
||||||
@@ -503,13 +503,11 @@ void asynStreamGeneratorDriver::receiveUDP() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
bool operator()(const NormalisedEvent l,
|
bool operator()(const NormalisedEvent l, const NormalisedEvent r) const {
|
||||||
const NormalisedEvent r) const {
|
return l.timestamp < r.timestamp;
|
||||||
return l.timestamp > r.timestamp;
|
|
||||||
}
|
}
|
||||||
} reverseSortEventsByTime;
|
} oldestEventsFirst;
|
||||||
|
|
||||||
inline int eventsInQueue(epicsRingBytesId id) {
|
inline int eventsInQueue(epicsRingBytesId id) {
|
||||||
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
|
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
|
||||||
@@ -526,35 +524,37 @@ void asynStreamGeneratorDriver::partialSortEvents() {
|
|||||||
int queuedEvents = 0;
|
int queuedEvents = 0;
|
||||||
epicsTimeStamp lastSort = epicsTime::getCurrent();
|
epicsTimeStamp lastSort = epicsTime::getCurrent();
|
||||||
epicsTimeStamp currentTime = lastSort;
|
epicsTimeStamp currentTime = lastSort;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
queuedEvents = eventsInQueue(this->udpQueue); // in case we can't wait
|
queuedEvents = eventsInQueue(this->udpQueue); // in case we can't wait
|
||||||
lastSort = epicsTime::getCurrent();
|
lastSort = epicsTime::getCurrent();
|
||||||
currentTime = lastSort;
|
currentTime = lastSort;
|
||||||
|
|
||||||
// wait for mininmum packet frequency or enough packets to ensure we could potentially
|
// wait for mininmum packet frequency or enough packets to ensure we
|
||||||
// have at least 1 packet per mcpdid
|
// could potentially have at least 1 packet per mcpdid
|
||||||
while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) {
|
while (queuedEvents < bufferedEvents &&
|
||||||
|
epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) {
|
||||||
epicsThreadSleep(0.0001); // seconds
|
epicsThreadSleep(0.0001); // seconds
|
||||||
currentTime = epicsTime::getCurrent();
|
currentTime = epicsTime::getCurrent();
|
||||||
queuedEvents = eventsInQueue(this->udpQueue);
|
queuedEvents = eventsInQueue(this->udpQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||||
|
|
||||||
if (queuedEvents) {
|
if (queuedEvents) {
|
||||||
epicsRingBytesGet(this->udpQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent));
|
epicsRingBytesGet(this->udpQueue, (char *)events,
|
||||||
|
queuedEvents * sizeof(NormalisedEvent));
|
||||||
|
|
||||||
std::sort(events, events + queuedEvents, reverseSortEventsByTime);
|
std::sort(events, events + queuedEvents, oldestEventsFirst);
|
||||||
|
|
||||||
epicsRingBytesPut(this->sortedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent));
|
epicsRingBytesPut(this->sortedQueue, (char *)events,
|
||||||
}
|
queuedEvents * sizeof(NormalisedEvent));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &&ne) {
|
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) {
|
||||||
if (this->kafkaEnabled) {
|
if (this->kafkaEnabled) {
|
||||||
if (ne.source == 0)
|
if (ne.source == 0)
|
||||||
epicsRingBytesPut(this->monitorQueue, (char *)&ne,
|
epicsRingBytesPut(this->monitorQueue, (char *)&ne,
|
||||||
@@ -576,8 +576,10 @@ void asynStreamGeneratorDriver::processEvents() {
|
|||||||
|
|
||||||
// we have two buffers. We alternate between reading data into one of them,
|
// we have two buffers. We alternate between reading data into one of them,
|
||||||
// and then merge sorting into the other
|
// and then merge sorting into the other
|
||||||
NormalisedEvent *eventsA = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
|
NormalisedEvent *eventsA =
|
||||||
NormalisedEvent *eventsB = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
|
new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
|
||||||
|
NormalisedEvent *eventsB =
|
||||||
|
new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
|
||||||
NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents;
|
NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents;
|
||||||
NormalisedEvent *eventsBLastEnd = eventsBLastStart;
|
NormalisedEvent *eventsBLastEnd = eventsBLastStart;
|
||||||
|
|
||||||
@@ -587,24 +589,36 @@ void asynStreamGeneratorDriver::processEvents() {
|
|||||||
epicsTimeStamp currentTime = lastProcess;
|
epicsTimeStamp currentTime = lastProcess;
|
||||||
|
|
||||||
epicsInt32 *counts = new epicsInt32[this->num_channels];
|
epicsInt32 *counts = new epicsInt32[this->num_channels];
|
||||||
|
double elapsedSeconds = 0;
|
||||||
|
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||||
|
uint64_t currTimestamp;
|
||||||
|
|
||||||
|
epicsInt32 currStatus = STATUS_IDLE;
|
||||||
|
epicsInt32 prevStatus = STATUS_IDLE;
|
||||||
|
epicsInt32 countPreset;
|
||||||
|
epicsInt32 timePreset;
|
||||||
|
epicsInt32 presetChannel;
|
||||||
epicsInt32 udpQueueHighWaterMark = 0;
|
epicsInt32 udpQueueHighWaterMark = 0;
|
||||||
epicsInt32 sortedQueueHighWaterMark = 0;
|
epicsInt32 sortedQueueHighWaterMark = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
queuedEvents = eventsInQueue(this->sortedQueue); // in case we can't wait
|
queuedEvents =
|
||||||
|
eventsInQueue(this->sortedQueue); // in case we can't wait
|
||||||
lastProcess = epicsTime::getCurrent();
|
lastProcess = epicsTime::getCurrent();
|
||||||
currentTime = lastProcess;
|
currentTime = lastProcess;
|
||||||
|
|
||||||
// wait for mininmum packet frequency or enough packets to ensure we could potentially
|
// wait for mininmum packet frequency or enough packets to ensure we
|
||||||
// have at least 1 packet per mcpdid
|
// could potentially have at least 1 packet per mcpdid
|
||||||
while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) {
|
while (queuedEvents < bufferedEvents &&
|
||||||
|
epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) {
|
||||||
epicsThreadSleep(0.0001); // seconds
|
epicsThreadSleep(0.0001); // seconds
|
||||||
currentTime = epicsTime::getCurrent();
|
currentTime = epicsTime::getCurrent();
|
||||||
queuedEvents = eventsInQueue(this->sortedQueue);
|
queuedEvents = eventsInQueue(this->sortedQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getIntegerParam(this->P_Status, &currStatus);
|
||||||
|
|
||||||
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||||
|
|
||||||
NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents;
|
NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents;
|
||||||
@@ -614,274 +628,71 @@ void asynStreamGeneratorDriver::processEvents() {
|
|||||||
// new read, in the case that all new events are newer timewise, and
|
// new read, in the case that all new events are newer timewise, and
|
||||||
// therefore, all events from eventsB have to be placed in a preceeding
|
// therefore, all events from eventsB have to be placed in a preceeding
|
||||||
// position.
|
// position.
|
||||||
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent));
|
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr,
|
||||||
|
queuedEvents * sizeof(NormalisedEvent));
|
||||||
|
|
||||||
int toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
int toProcess =
|
||||||
|
eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
||||||
|
|
||||||
// TODO could also consider an in-place merge
|
// TODO could also consider an in-place merge
|
||||||
eventsBLastEnd = std::merge(
|
eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents,
|
||||||
newStartPtr, newStartPtr + queuedEvents,
|
eventsBLastStart, eventsBLastEnd, eventsA,
|
||||||
eventsBLastStart, eventsBLastEnd,
|
oldestEventsFirst);
|
||||||
eventsA, reverseSortEventsByTime
|
|
||||||
);
|
|
||||||
|
|
||||||
eventsBLastStart = eventsA + toProcess;
|
eventsBLastStart = eventsA + toProcess;
|
||||||
|
|
||||||
for (size_t i = 0; i < toProcess; ++i) {
|
// TODO I haven't really taken care of the case that there are no events
|
||||||
counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += 1;
|
|
||||||
|
if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) {
|
||||||
|
|
||||||
|
getIntegerParam(this->P_CountPreset, &countPreset);
|
||||||
|
getIntegerParam(this->P_TimePreset, &timePreset);
|
||||||
|
getIntegerParam(this->P_MonitorChannel, &presetChannel);
|
||||||
|
|
||||||
|
// reset status variables
|
||||||
|
startTimestamp = eventsA[0].timestamp;
|
||||||
|
elapsedSeconds = 0;
|
||||||
|
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||||
|
counts[i] = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < num_channels; ++i) {
|
if (currStatus == STATUS_COUNTING) {
|
||||||
setIntegerParam(P_Counts[i], counts[i]);
|
|
||||||
|
// The elapsedSeconds are round differently depending on whether we
|
||||||
|
// are using them for comparison, or for showing to the user, to
|
||||||
|
// try and make sure the data we send to kafka is correct, while
|
||||||
|
// the measurement time also appears intuitive.
|
||||||
|
for (size_t i = 0; i < toProcess; ++i) {
|
||||||
|
counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] +=
|
||||||
|
1;
|
||||||
|
elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9;
|
||||||
|
|
||||||
|
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||||
|
(timePreset && elapsedSeconds > (double)timePreset))
|
||||||
|
break;
|
||||||
|
|
||||||
|
// TODO also batchwise?
|
||||||
|
this->queueForKafka(eventsA[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < num_channels; ++i) {
|
||||||
|
setIntegerParam(P_Counts[i], counts[i]);
|
||||||
|
}
|
||||||
|
setIntegerParam(P_ElapsedTime, (epicsInt32)elapsedSeconds);
|
||||||
|
|
||||||
|
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||||
|
(timePreset && elapsedSeconds > (double)timePreset)) {
|
||||||
|
setIntegerParam(this->P_Status, STATUS_IDLE);
|
||||||
|
setIntegerParam(this->P_CountPreset, 0);
|
||||||
|
setIntegerParam(this->P_TimePreset, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
prevStatus = currStatus;
|
||||||
|
|
||||||
std::swap(eventsA, eventsB);
|
std::swap(eventsA, eventsB);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// // TODO this is totally decoupled!!!
|
|
||||||
// const size_t queueBufferSize =
|
|
||||||
// 10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent);
|
|
||||||
|
|
||||||
// //struct {
|
|
||||||
// // bool operator()(const NormalisedEvent l,
|
|
||||||
// // const NormalisedEvent r) const {
|
|
||||||
// // return l.timestamp > r.timestamp;
|
|
||||||
// // }
|
|
||||||
// //} smallestToLargest;
|
|
||||||
|
|
||||||
// //// This should never be used. It is just instantiated to reserve a buffer
|
|
||||||
// //// of specific size.
|
|
||||||
// //std::vector<NormalisedEvent> queueBuffer;
|
|
||||||
// //queueBuffer.reserve(queueBufferSize);
|
|
||||||
|
|
||||||
// //std::priority_queue<NormalisedEvent, std::vector<NormalisedEvent>,
|
|
||||||
// // decltype(smallestToLargest)>
|
|
||||||
// // timeQueue(smallestToLargest, std::move(queueBuffer));
|
|
||||||
|
|
||||||
// NormalisedEvent* timeQueue = new NormalisedEvent[queueBufferSize];
|
|
||||||
|
|
||||||
// // TODO epics doesn't seem to support uint64, you would need an array of
|
|
||||||
// // uint32. It does support int64 though.. so we start with that
|
|
||||||
// epicsInt32 *counts = new epicsInt32[this->num_channels];
|
|
||||||
|
|
||||||
// const uint64_t minRateSamplePeriod = 100'000'000ll;
|
|
||||||
// const size_t rateAverageWindow = 20;
|
|
||||||
// size_t countDiffsPtr = 0;
|
|
||||||
// epicsInt32 *rates = new epicsInt32[this->num_channels];
|
|
||||||
// epicsInt32 *countDiff = new epicsInt32[this->num_channels];
|
|
||||||
// epicsInt32 *countDiffs =
|
|
||||||
// new epicsInt32[this->num_channels * rateAverageWindow];
|
|
||||||
// uint64_t *timeSpans = new uint64_t[this->num_channels];
|
|
||||||
// epicsTimeStamp lastRateUpdate = epicsTime::getCurrent();
|
|
||||||
|
|
||||||
// asynStatus status = asynSuccess;
|
|
||||||
// NormalisedEvent ne;
|
|
||||||
// uint64_t newestTimestamp = 0;
|
|
||||||
// uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
|
||||||
// uint64_t currTimestamp;
|
|
||||||
// epicsInt32 elapsedSeconds = 0;
|
|
||||||
// epicsInt32 prevStatus = STATUS_IDLE;
|
|
||||||
// epicsInt32 currStatus = STATUS_IDLE;
|
|
||||||
// epicsInt32 countPreset = 0;
|
|
||||||
// epicsInt32 timePreset = 0;
|
|
||||||
// epicsInt32 presetChannel = 0;
|
|
||||||
// epicsInt32 udpQueueHighWaterMark = 0;
|
|
||||||
|
|
||||||
// while (true) {
|
|
||||||
|
|
||||||
// // I think mostly everything should already by sorted
|
|
||||||
// // could probably in the other thread guarantee that each packet is sorted
|
|
||||||
// // but probably it already is...
|
|
||||||
// //
|
|
||||||
// // so really we just need to merge sort chunks
|
|
||||||
|
|
||||||
// // idea is to try and guarantee at least 1 packet per id or the min
|
|
||||||
// // frequency for each id without actually checking all ids
|
|
||||||
// // size_t timeQueuePtr = 0;
|
|
||||||
// // while (timeQueuePtr < 1500 * 10) {
|
|
||||||
|
|
||||||
// // // TODO depending on how this is implemented, I may also need to
|
|
||||||
// // // check that there is is enough bytes, in case it does partial
|
|
||||||
// // // writes...
|
|
||||||
// // if (epicsRingBytesGet(udpQueue, (char *)&ne,
|
|
||||||
// // sizeof(NormalisedEvent))) {
|
|
||||||
// // // we should restart this ioc at least every few years, as at ns
|
|
||||||
// // // resolution with a uint64_t we will have an overflow after
|
|
||||||
// // // around 4 years
|
|
||||||
// // newestTimestamp = std::max(newestTimestamp, ne.timestamp);
|
|
||||||
|
|
||||||
// // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0];
|
|
||||||
|
|
||||||
// // timeQueue.push(std::move(ne));
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// // }
|
|
||||||
|
|
||||||
|
|
||||||
// // while (timeQueue.empty() ||
|
|
||||||
// // (timeQueue.size() < 1500 * 10 &&
|
|
||||||
// // newestTimestamp - timeQueue.top().timestamp < 200'000'000ull)) {
|
|
||||||
|
|
||||||
// // // TODO depending on how this is implemented, I may also need to
|
|
||||||
// // // check that there is is enough bytes, in case it does partial
|
|
||||||
// // // writes...
|
|
||||||
// // if (epicsRingBytesGet(udpQueue, (char *)&ne,
|
|
||||||
// // sizeof(NormalisedEvent))) {
|
|
||||||
// // // we should restart this ioc at least every few years, as at ns
|
|
||||||
// // // resolution with a uint64_t we will have an overflow after
|
|
||||||
// // // around 4 years
|
|
||||||
// // newestTimestamp = std::max(newestTimestamp, ne.timestamp);
|
|
||||||
|
|
||||||
// // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0];
|
|
||||||
|
|
||||||
// // timeQueue.push(std::move(ne));
|
|
||||||
// // }
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// // ne = timeQueue.top();
|
|
||||||
// // timeQueue.pop();
|
|
||||||
|
|
||||||
// // status = getIntegerParam(this->P_Status, &currStatus);
|
|
||||||
|
|
||||||
// // udpQueueHighWaterMark =
|
|
||||||
// // epicsRingBytesHighWaterMark(udpQueue) / sizeof(NormalisedEvent);
|
|
||||||
|
|
||||||
// // // if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) {
|
|
||||||
// // // // Starting a new count
|
|
||||||
|
|
||||||
// // // // get current count configuration
|
|
||||||
// // // getIntegerParam(this->P_CountPreset, &countPreset);
|
|
||||||
// // // getIntegerParam(this->P_TimePreset, &timePreset);
|
|
||||||
// // // getIntegerParam(this->P_MonitorChannel, &presetChannel);
|
|
||||||
|
|
||||||
// // // // reset status variables
|
|
||||||
// // // startTimestamp = std::numeric_limits<uint64_t>::max();
|
|
||||||
// // // for (size_t i = 0; i < this->num_channels; ++i) {
|
|
||||||
// // // counts[i] = 0;
|
|
||||||
// // // }
|
|
||||||
|
|
||||||
// // // // reset pvs
|
|
||||||
// // // // lock();
|
|
||||||
// // // // for (size_t i = 0; i < num_channels; ++i) {
|
|
||||||
// // // // setIntegerParam(P_Counts[i], counts[i]);
|
|
||||||
// // // // }
|
|
||||||
// // // // setIntegerParam(P_ElapsedTime, 0);
|
|
||||||
// // // // callParamCallbacks();
|
|
||||||
// // // // unlock();
|
|
||||||
|
|
||||||
// // // // TODO might consider throwing out current buffer as it is
|
|
||||||
// // // // from before count started? then again, 0.2 ms or whatever is
|
|
||||||
// // // // set above is quite a small preceeding amount of time, so
|
|
||||||
// // // // maybe it doesn't matter
|
|
||||||
// // // }
|
|
||||||
|
|
||||||
// // // prevStatus = currStatus;
|
|
||||||
|
|
||||||
// // //if (currStatus == STATUS_COUNTING) {
|
|
||||||
// // startTimestamp = std::min(startTimestamp, ne.timestamp);
|
|
||||||
// // currTimestamp = ne.timestamp;
|
|
||||||
// // elapsedSeconds =
|
|
||||||
// // 0 ? currTimestamp <= startTimestamp
|
|
||||||
// // : ((double)(currTimestamp - startTimestamp)) / 1e9;
|
|
||||||
|
|
||||||
// // // is our count finished?
|
|
||||||
// // // if ((countPreset && counts[presetChannel] >= countPreset) ||
|
|
||||||
// // // (timePreset && elapsedSeconds >= timePreset)) {
|
|
||||||
|
|
||||||
// // // // filter out events that occured after the specified time
|
|
||||||
// // // if (ne.timestamp - startTimestamp <= countPreset) {
|
|
||||||
// // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
|
||||||
// // // this->queueForKafka(std::move(ne));
|
|
||||||
|
|
||||||
// // // // add any remaining events with the same timestamp
|
|
||||||
// // // // we could theoretically have a small overrun if the
|
|
||||||
// // // // timestamps are identical on the monitor channel
|
|
||||||
// // // while (!timeQueue.empty() &&
|
|
||||||
// // // !timeQueue.top().timestamp == currTimestamp) {
|
|
||||||
// // // ne = timeQueue.top();
|
|
||||||
// // // timeQueue.pop();
|
|
||||||
// // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
|
||||||
// // // this->queueForKafka(std::move(ne));
|
|
||||||
// // // }
|
|
||||||
// // // }
|
|
||||||
|
|
||||||
// // // countPreset = 0;
|
|
||||||
// // // timePreset = 0;
|
|
||||||
|
|
||||||
// // // // lock();
|
|
||||||
// // // for (size_t i = 0; i < num_channels; ++i) {
|
|
||||||
// // // setIntegerParam(P_Counts[i], counts[i]);
|
|
||||||
// // // }
|
|
||||||
// // // setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
|
||||||
// // // setIntegerParam(P_CountPreset, countPreset);
|
|
||||||
// // // setIntegerParam(P_TimePreset, timePreset);
|
|
||||||
// // // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark);
|
|
||||||
// // // // callParamCallbacks();
|
|
||||||
// // // setIntegerParam(P_Status, STATUS_IDLE);
|
|
||||||
// // // // callParamCallbacks();
|
|
||||||
// // // // unlock();
|
|
||||||
|
|
||||||
// // // epicsRingBytesResetHighWaterMark(udpQueue);
|
|
||||||
|
|
||||||
// // // } else {
|
|
||||||
|
|
||||||
// // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
|
||||||
// // this->queueForKafka(std::move(ne));
|
|
||||||
|
|
||||||
// // // lock();
|
|
||||||
// // for (size_t i = 0; i < num_channels; ++i) {
|
|
||||||
// // setIntegerParam(P_Counts[i], counts[i]);
|
|
||||||
// // }
|
|
||||||
// // setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
|
||||||
// // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark);
|
|
||||||
// // // callParamCallbacks();
|
|
||||||
// // // unlock();
|
|
||||||
// // // }
|
|
||||||
// // //}
|
|
||||||
|
|
||||||
// // // Careful changing any of these magic numbers until I clean this up
|
|
||||||
// // // as you might end up calculating the wrong rate
|
|
||||||
// // // epicsTimeStamp currentTime = epicsTime::getCurrent();
|
|
||||||
// // // if (epicsTimeDiffInNS(¤tTime, &lastRateUpdate) >
|
|
||||||
// // // minRateSamplePeriod) {
|
|
||||||
// // // timeSpans[countDiffsPtr] =
|
|
||||||
// // // epicsTimeDiffInNS(¤tTime, &lastRateUpdate);
|
|
||||||
|
|
||||||
// // // uint64_t totalTime = 0;
|
|
||||||
// // // for (size_t i = 0; i <= rateAverageWindow; ++i) {
|
|
||||||
// // // totalTime += timeSpans[i];
|
|
||||||
// // // }
|
|
||||||
|
|
||||||
// // // lastRateUpdate = currentTime;
|
|
||||||
|
|
||||||
// // // for (size_t i = 0; i <= this->num_channels; ++i) {
|
|
||||||
// // // countDiffs[i * rateAverageWindow + countDiffsPtr] =
|
|
||||||
// // // countDiff[i];
|
|
||||||
|
|
||||||
// // // uint64_t cnt = 0;
|
|
||||||
// // // for (size_t j = 0; j <= rateAverageWindow; ++j) {
|
|
||||||
// // // cnt += countDiffs[i * rateAverageWindow + j];
|
|
||||||
// // // }
|
|
||||||
// // // rates[i] = cnt / (totalTime * 1e-9);
|
|
||||||
|
|
||||||
// // // countDiff[i] = 0;
|
|
||||||
// // // }
|
|
||||||
|
|
||||||
// // // countDiffsPtr = (countDiffsPtr + 1) % rateAverageWindow;
|
|
||||||
|
|
||||||
// // // if (countDiffsPtr % 5 == 0) {
|
|
||||||
// // // // lock();
|
|
||||||
// // // for (size_t i = 0; i < num_channels; ++i) {
|
|
||||||
// // // setIntegerParam(P_Rates[i], rates[i]);
|
|
||||||
// // // }
|
|
||||||
// // // // callParamCallbacks();
|
|
||||||
// // // // unlock();
|
|
||||||
// // // }
|
|
||||||
// // // }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
|
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
|
||||||
@@ -922,7 +733,7 @@ void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
|
|||||||
|
|
||||||
// At least every 0.2 seconds
|
// At least every 0.2 seconds
|
||||||
if (total >= this->kafkaMaxPacketSize ||
|
if (total >= this->kafkaMaxPacketSize ||
|
||||||
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
|
epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) {
|
||||||
last_sent = epicsTime::getCurrent();
|
last_sent = epicsTime::getCurrent();
|
||||||
|
|
||||||
if (total) {
|
if (total) {
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
|||||||
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
|
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
|
||||||
epicsInt32 initialValue = 0);
|
epicsInt32 initialValue = 0);
|
||||||
|
|
||||||
inline void queueForKafka(NormalisedEvent &&ne);
|
inline void queueForKafka(NormalisedEvent &ne);
|
||||||
|
|
||||||
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
|
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
|
||||||
const char *topic, const char *source);
|
const char *topic, const char *source);
|
||||||
|
|||||||
Reference in New Issue
Block a user