in the process of switching to a more batch processing approach. so far, seems like it can keep up
This commit is contained in:
+4
-2
@@ -59,7 +59,8 @@ record(longin, "$(INSTR)$(NAME):M$(CHANNEL)")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)")
|
||||
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||
field(SCAN, "I/O Intr")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, ".2 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
@@ -69,6 +70,7 @@ record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
|
||||
field(EGU, "cts/sec")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
|
||||
field(SCAN, "I/O Intr")
|
||||
field(SCAN, ".2 second")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
+29
-2
@@ -39,7 +39,8 @@ record(mbbi, "$(INSTR)$(NAME):STATUS")
|
||||
field(FRVL, "4")
|
||||
field(FRST, "INVALID")
|
||||
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||
field(SCAN, "I/O Intr")
|
||||
#field(SCAN, "I/O Intr")
|
||||
field(SCAN, ".5 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
@@ -201,7 +202,33 @@ record(ai,"$(INSTR)$(NAME):ELAPSED-TIME")
|
||||
field(EGU, "sec")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME")
|
||||
field(SCAN, "I/O Intr")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, ".5 second")
|
||||
field(PINI, "YES")
|
||||
# field(FLNK, "$(INSTR)$(NAME):ETO")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Stream Generator Status PVs
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||
{
|
||||
field(DESC, "Max Events in Queue")
|
||||
field(EGU, "Events")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||
{
|
||||
field(DESC, "Max Events in Queue")
|
||||
field(EGU, "Events")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
@@ -9,6 +9,10 @@ epicsEnvSet("INSTR", "SQ:TEST:")
|
||||
epicsEnvSet("NAME", "SG")
|
||||
|
||||
drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
|
||||
|
||||
# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory
|
||||
# 10'000 * 243 = 2.43e6 events
|
||||
|
||||
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192)
|
||||
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
|
||||
|
||||
|
||||
+404
-183
@@ -59,6 +59,11 @@ static void udpPollerTask(void *drvPvt) {
|
||||
pSGD->receiveUDP();
|
||||
}
|
||||
|
||||
static void sortTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->partialSortEvents();
|
||||
}
|
||||
|
||||
static void daqTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->processEvents();
|
||||
@@ -107,7 +112,12 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
0), /* Default stack size*/
|
||||
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
||||
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
||||
udpQueue(epicsRingBytesCreate(udpQueueSize * sizeof(NormalisedEvent))),
|
||||
// so these first to are measured in max packet sizes
|
||||
udpQueue(
|
||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||
// TODO configurable sizes
|
||||
sortedQueue(epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||
// and these two are currently measured in event sizes...
|
||||
monitorQueue(
|
||||
epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||
detectorQueue(
|
||||
@@ -151,6 +161,11 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i);
|
||||
}
|
||||
|
||||
status = createInt32Param(status, P_UdpQueueHighWaterMarkString,
|
||||
&P_UdpQueueHighWaterMark);
|
||||
status = createInt32Param(status, P_SortedQueueHighWaterMarkString,
|
||||
&P_SortedQueueHighWaterMark);
|
||||
|
||||
if (status) {
|
||||
epicsStdoutPrintf(
|
||||
"%s:%s: failed to create or setup parameters, status=%d\n",
|
||||
@@ -210,10 +225,26 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
|
||||
/* Create the thread that orders the events and acts as our sinqDaq stand-in
|
||||
*/
|
||||
status = (asynStatus)(epicsThreadCreate(
|
||||
"sinqDAQ", epicsThreadPriorityMax,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::daqTask, this) == NULL);
|
||||
status =
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"sinqDAQ",
|
||||
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::daqTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Create the thread that orders packets of in preparation for our sinqDAQ stand-in
|
||||
*/
|
||||
status =
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"partialSort",
|
||||
epicsThreadPriorityMedium,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::sortTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
@@ -254,6 +285,32 @@ asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
|
||||
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) {
|
||||
|
||||
int function = pasynUser->reason;
|
||||
asynStatus status = asynSuccess;
|
||||
const char *paramName;
|
||||
const char *functionName = "readInt32";
|
||||
getParamName(function, ¶mName);
|
||||
|
||||
if (function == P_UdpQueueHighWaterMark) {
|
||||
*value =
|
||||
epicsRingBytesHighWaterMark(this->udpQueue) / sizeof(NormalisedEvent);
|
||||
// Aparently resetting the watermark causes problems...
|
||||
// at least concurrently :D
|
||||
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
||||
return asynSuccess;
|
||||
} else if (function == P_SortedQueueHighWaterMark) {
|
||||
*value =
|
||||
epicsRingBytesHighWaterMark(this->sortedQueue) / sizeof(NormalisedEvent);
|
||||
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
||||
return asynSuccess;
|
||||
}
|
||||
|
||||
return asynPortDriver::readInt32(pasynUser, value);
|
||||
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
||||
epicsInt32 value) {
|
||||
int function = pasynUser->reason;
|
||||
@@ -446,6 +503,57 @@ void asynStreamGeneratorDriver::receiveUDP() {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
struct {
|
||||
bool operator()(const NormalisedEvent l,
|
||||
const NormalisedEvent r) const {
|
||||
return l.timestamp > r.timestamp;
|
||||
}
|
||||
} reverseSortEventsByTime;
|
||||
|
||||
inline int eventsInQueue(epicsRingBytesId id) {
|
||||
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::partialSortEvents() {
|
||||
|
||||
const char *functionName = "partialSortEvents";
|
||||
|
||||
// x * number of ids * max events in packet
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
NormalisedEvent *events = new NormalisedEvent[bufferedEvents];
|
||||
|
||||
int queuedEvents = 0;
|
||||
epicsTimeStamp lastSort = epicsTime::getCurrent();
|
||||
epicsTimeStamp currentTime = lastSort;
|
||||
|
||||
while (true) {
|
||||
|
||||
queuedEvents = eventsInQueue(this->udpQueue); // in case we can't wait
|
||||
lastSort = epicsTime::getCurrent();
|
||||
currentTime = lastSort;
|
||||
|
||||
// wait for mininmum packet frequency or enough packets to ensure we could potentially
|
||||
// have at least 1 packet per mcpdid
|
||||
while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
currentTime = epicsTime::getCurrent();
|
||||
queuedEvents = eventsInQueue(this->udpQueue);
|
||||
}
|
||||
|
||||
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||
|
||||
if (queuedEvents) {
|
||||
epicsRingBytesGet(this->udpQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent));
|
||||
|
||||
std::sort(events, events + queuedEvents, reverseSortEventsByTime);
|
||||
|
||||
epicsRingBytesPut(this->sortedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &&ne) {
|
||||
if (this->kafkaEnabled) {
|
||||
if (ne.source == 0)
|
||||
@@ -461,206 +569,319 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
|
||||
const char *functionName = "processEvents";
|
||||
|
||||
const size_t queueBufferSize =
|
||||
10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent);
|
||||
// x * number of ids * max events in packet * event size
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
// we need a little extra space for merge sorting in
|
||||
int extraBufferedEvents = 1 * 10 * 243;
|
||||
|
||||
struct {
|
||||
bool operator()(const NormalisedEvent l,
|
||||
const NormalisedEvent r) const {
|
||||
return l.timestamp > r.timestamp;
|
||||
}
|
||||
} smallestToLargest;
|
||||
// we have two buffers. We alternate between reading data into one of them,
|
||||
// and then merge sorting into the other
|
||||
NormalisedEvent *eventsA = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
|
||||
NormalisedEvent *eventsB = new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
|
||||
NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents;
|
||||
NormalisedEvent *eventsBLastEnd = eventsBLastStart;
|
||||
|
||||
// This should never be used. It is just instantiated to reserve a buffer
|
||||
// of specific size.
|
||||
std::vector<NormalisedEvent> queueBuffer;
|
||||
queueBuffer.reserve(queueBufferSize);
|
||||
int queuedEvents = 0;
|
||||
|
||||
std::priority_queue<NormalisedEvent, std::vector<NormalisedEvent>,
|
||||
decltype(smallestToLargest)>
|
||||
timeQueue(smallestToLargest, std::move(queueBuffer));
|
||||
epicsTimeStamp lastProcess = epicsTime::getCurrent();
|
||||
epicsTimeStamp currentTime = lastProcess;
|
||||
|
||||
// TODO epics doesn't seem to support uint64, you would need an array of
|
||||
// uint32. It does support int64 though.. so we start with that
|
||||
epicsInt32 *counts = new epicsInt32[this->num_channels];
|
||||
|
||||
const uint64_t minRateSamplePeriod = 100'000'000ll;
|
||||
const size_t rateAverageWindow = 20;
|
||||
size_t countDiffsPtr = 0;
|
||||
epicsInt32 *rates = new epicsInt32[this->num_channels];
|
||||
epicsInt32 *countDiff = new epicsInt32[this->num_channels];
|
||||
epicsInt32 *countDiffs =
|
||||
new epicsInt32[this->num_channels * rateAverageWindow];
|
||||
uint64_t *timeSpans = new uint64_t[this->num_channels];
|
||||
epicsTimeStamp lastRateUpdate = epicsTime::getCurrent();
|
||||
|
||||
asynStatus status = asynSuccess;
|
||||
NormalisedEvent ne;
|
||||
uint64_t newestTimestamp = 0;
|
||||
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t currTimestamp;
|
||||
epicsInt32 elapsedSeconds = 0;
|
||||
epicsInt32 prevStatus = STATUS_IDLE;
|
||||
epicsInt32 currStatus = STATUS_IDLE;
|
||||
epicsInt32 countPreset = 0;
|
||||
epicsInt32 timePreset = 0;
|
||||
epicsInt32 presetChannel = 0;
|
||||
epicsInt32 udpQueueHighWaterMark = 0;
|
||||
epicsInt32 sortedQueueHighWaterMark = 0;
|
||||
|
||||
while (true) {
|
||||
|
||||
// TODO depending on how this is implemented, I may also need to check
|
||||
// that there is is enough bytes, in case it does partial writes...
|
||||
if (epicsRingBytesGet(udpQueue, (char *)&ne, sizeof(NormalisedEvent))) {
|
||||
// we should reastart this ioc at least every few years, as at ns
|
||||
// resolution with a uint64_t we will have an overflow after around
|
||||
// 4 years
|
||||
newestTimestamp = std::max(newestTimestamp, ne.timestamp);
|
||||
queuedEvents = eventsInQueue(this->sortedQueue); // in case we can't wait
|
||||
lastProcess = epicsTime::getCurrent();
|
||||
currentTime = lastProcess;
|
||||
|
||||
++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0];
|
||||
|
||||
timeQueue.push(std::move(ne));
|
||||
// wait for mininmum packet frequency or enough packets to ensure we could potentially
|
||||
// have at least 1 packet per mcpdid
|
||||
while (queuedEvents < bufferedEvents && epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
currentTime = epicsTime::getCurrent();
|
||||
queuedEvents = eventsInQueue(this->sortedQueue);
|
||||
}
|
||||
|
||||
// idea is to try and guarantee at least 1 packet per id or the min
|
||||
// frequency for each id without actually checking all ids
|
||||
if (timeQueue.size() >= 1500 * 10 ||
|
||||
(timeQueue.size() > 0 &&
|
||||
newestTimestamp - timeQueue.top().timestamp >= 200'000'000ull)) {
|
||||
ne = timeQueue.top();
|
||||
timeQueue.pop();
|
||||
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||
|
||||
status = getIntegerParam(this->P_Status, &currStatus);
|
||||
NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents;
|
||||
|
||||
if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) {
|
||||
// Starting a new count
|
||||
// We read into the array, such that we have enough space, that the
|
||||
// entirety of the leftover from the previous read can fit before this
|
||||
// new read, in the case that all new events are newer timewise, and
|
||||
// therefore, all events from eventsB have to be placed in a preceeding
|
||||
// position.
|
||||
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent));
|
||||
|
||||
// get current count configuration
|
||||
getIntegerParam(this->P_CountPreset, &countPreset);
|
||||
getIntegerParam(this->P_TimePreset, &timePreset);
|
||||
getIntegerParam(this->P_MonitorChannel, &presetChannel);
|
||||
int toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
||||
|
||||
// reset status variables
|
||||
startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||
counts[i] = 0;
|
||||
}
|
||||
// TODO could also consider an in-place merge
|
||||
eventsBLastEnd = std::merge(
|
||||
newStartPtr, newStartPtr + queuedEvents,
|
||||
eventsBLastStart, eventsBLastEnd,
|
||||
eventsA, reverseSortEventsByTime
|
||||
);
|
||||
|
||||
// reset pvs
|
||||
lock();
|
||||
for (size_t i = 0; i < num_channels; ++i) {
|
||||
setIntegerParam(P_Counts[i], counts[i]);
|
||||
}
|
||||
setIntegerParam(P_ElapsedTime, 0);
|
||||
callParamCallbacks();
|
||||
unlock();
|
||||
eventsBLastStart = eventsA + toProcess;
|
||||
|
||||
// TODO might consider throwing out current buffer as it is
|
||||
// from before count started? then again, 0.2 ms or whatever is
|
||||
// set above is quite a small preceeding amount of time, so
|
||||
// maybe it doesn't matter
|
||||
}
|
||||
|
||||
prevStatus = currStatus;
|
||||
|
||||
if (currStatus == STATUS_COUNTING) {
|
||||
startTimestamp = std::min(startTimestamp, ne.timestamp);
|
||||
currTimestamp = ne.timestamp;
|
||||
elapsedSeconds =
|
||||
0 ? currTimestamp <= startTimestamp
|
||||
: ((double)(currTimestamp - startTimestamp)) / 1e9;
|
||||
|
||||
// is our count finished?
|
||||
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||
(timePreset && elapsedSeconds >= timePreset)) {
|
||||
|
||||
// filter out events that occured after the specified time
|
||||
if (ne.timestamp - startTimestamp <= countPreset) {
|
||||
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
||||
this->queueForKafka(std::move(ne));
|
||||
|
||||
// add any remaining events with the same timestamp
|
||||
// we could theoretically have a small overrun if the
|
||||
// timestamps are identical on the monitor channel
|
||||
while (!timeQueue.empty() &&
|
||||
!timeQueue.top().timestamp == currTimestamp) {
|
||||
ne = timeQueue.top();
|
||||
timeQueue.pop();
|
||||
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
||||
this->queueForKafka(std::move(ne));
|
||||
}
|
||||
}
|
||||
|
||||
countPreset = 0;
|
||||
timePreset = 0;
|
||||
|
||||
lock();
|
||||
for (size_t i = 0; i < num_channels; ++i) {
|
||||
setIntegerParam(P_Counts[i], counts[i]);
|
||||
}
|
||||
setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
||||
setIntegerParam(P_CountPreset, countPreset);
|
||||
setIntegerParam(P_TimePreset, timePreset);
|
||||
callParamCallbacks();
|
||||
setIntegerParam(P_Status, STATUS_IDLE);
|
||||
callParamCallbacks();
|
||||
unlock();
|
||||
|
||||
} else {
|
||||
|
||||
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
||||
this->queueForKafka(std::move(ne));
|
||||
|
||||
lock();
|
||||
for (size_t i = 0; i < num_channels; ++i) {
|
||||
setIntegerParam(P_Counts[i], counts[i]);
|
||||
}
|
||||
setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
||||
callParamCallbacks();
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < toProcess; ++i) {
|
||||
counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += 1;
|
||||
}
|
||||
|
||||
// Careful changing any of these magic numbers until I clean this up
|
||||
// as you might end up calculating the wrong rate
|
||||
epicsTimeStamp currentTime = epicsTime::getCurrent();
|
||||
if (epicsTimeDiffInNS(¤tTime, &lastRateUpdate) >
|
||||
minRateSamplePeriod) {
|
||||
timeSpans[countDiffsPtr] =
|
||||
epicsTimeDiffInNS(¤tTime, &lastRateUpdate);
|
||||
|
||||
uint64_t totalTime = 0;
|
||||
for (size_t i = 0; i <= rateAverageWindow; ++i) {
|
||||
totalTime += timeSpans[i];
|
||||
}
|
||||
|
||||
lastRateUpdate = currentTime;
|
||||
|
||||
for (size_t i = 0; i <= this->num_channels; ++i) {
|
||||
countDiffs[i * rateAverageWindow + countDiffsPtr] =
|
||||
countDiff[i];
|
||||
|
||||
uint64_t cnt = 0;
|
||||
for (size_t j = 0; j <= rateAverageWindow; ++j) {
|
||||
cnt += countDiffs[i * rateAverageWindow + j];
|
||||
}
|
||||
rates[i] = cnt / (totalTime * 1e-9);
|
||||
|
||||
countDiff[i] = 0;
|
||||
}
|
||||
|
||||
countDiffsPtr = (countDiffsPtr + 1) % rateAverageWindow;
|
||||
|
||||
if (countDiffsPtr % 5 == 0) {
|
||||
lock();
|
||||
for (size_t i = 0; i < num_channels; ++i) {
|
||||
setIntegerParam(P_Rates[i], rates[i]);
|
||||
}
|
||||
callParamCallbacks();
|
||||
unlock();
|
||||
}
|
||||
for (size_t i = 0; i < num_channels; ++i) {
|
||||
setIntegerParam(P_Counts[i], counts[i]);
|
||||
}
|
||||
|
||||
//setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
||||
|
||||
std::swap(eventsA, eventsB);
|
||||
|
||||
}
|
||||
|
||||
// // TODO this is totally decoupled!!!
|
||||
// const size_t queueBufferSize =
|
||||
// 10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent);
|
||||
|
||||
// //struct {
|
||||
// // bool operator()(const NormalisedEvent l,
|
||||
// // const NormalisedEvent r) const {
|
||||
// // return l.timestamp > r.timestamp;
|
||||
// // }
|
||||
// //} smallestToLargest;
|
||||
|
||||
// //// This should never be used. It is just instantiated to reserve a buffer
|
||||
// //// of specific size.
|
||||
// //std::vector<NormalisedEvent> queueBuffer;
|
||||
// //queueBuffer.reserve(queueBufferSize);
|
||||
|
||||
// //std::priority_queue<NormalisedEvent, std::vector<NormalisedEvent>,
|
||||
// // decltype(smallestToLargest)>
|
||||
// // timeQueue(smallestToLargest, std::move(queueBuffer));
|
||||
|
||||
// NormalisedEvent* timeQueue = new NormalisedEvent[queueBufferSize];
|
||||
|
||||
// // TODO epics doesn't seem to support uint64, you would need an array of
|
||||
// // uint32. It does support int64 though.. so we start with that
|
||||
// epicsInt32 *counts = new epicsInt32[this->num_channels];
|
||||
|
||||
// const uint64_t minRateSamplePeriod = 100'000'000ll;
|
||||
// const size_t rateAverageWindow = 20;
|
||||
// size_t countDiffsPtr = 0;
|
||||
// epicsInt32 *rates = new epicsInt32[this->num_channels];
|
||||
// epicsInt32 *countDiff = new epicsInt32[this->num_channels];
|
||||
// epicsInt32 *countDiffs =
|
||||
// new epicsInt32[this->num_channels * rateAverageWindow];
|
||||
// uint64_t *timeSpans = new uint64_t[this->num_channels];
|
||||
// epicsTimeStamp lastRateUpdate = epicsTime::getCurrent();
|
||||
|
||||
// asynStatus status = asynSuccess;
|
||||
// NormalisedEvent ne;
|
||||
// uint64_t newestTimestamp = 0;
|
||||
// uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||
// uint64_t currTimestamp;
|
||||
// epicsInt32 elapsedSeconds = 0;
|
||||
// epicsInt32 prevStatus = STATUS_IDLE;
|
||||
// epicsInt32 currStatus = STATUS_IDLE;
|
||||
// epicsInt32 countPreset = 0;
|
||||
// epicsInt32 timePreset = 0;
|
||||
// epicsInt32 presetChannel = 0;
|
||||
// epicsInt32 udpQueueHighWaterMark = 0;
|
||||
|
||||
// while (true) {
|
||||
|
||||
// // I think mostly everything should already by sorted
|
||||
// // could probably in the other thread guarantee that each packet is sorted
|
||||
// // but probably it already is...
|
||||
// //
|
||||
// // so really we just need to merge sort chunks
|
||||
|
||||
// // idea is to try and guarantee at least 1 packet per id or the min
|
||||
// // frequency for each id without actually checking all ids
|
||||
// // size_t timeQueuePtr = 0;
|
||||
// // while (timeQueuePtr < 1500 * 10) {
|
||||
|
||||
// // // TODO depending on how this is implemented, I may also need to
|
||||
// // // check that there is is enough bytes, in case it does partial
|
||||
// // // writes...
|
||||
// // if (epicsRingBytesGet(udpQueue, (char *)&ne,
|
||||
// // sizeof(NormalisedEvent))) {
|
||||
// // // we should restart this ioc at least every few years, as at ns
|
||||
// // // resolution with a uint64_t we will have an overflow after
|
||||
// // // around 4 years
|
||||
// // newestTimestamp = std::max(newestTimestamp, ne.timestamp);
|
||||
|
||||
// // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0];
|
||||
|
||||
// // timeQueue.push(std::move(ne));
|
||||
// // }
|
||||
|
||||
// // }
|
||||
|
||||
|
||||
// // while (timeQueue.empty() ||
|
||||
// // (timeQueue.size() < 1500 * 10 &&
|
||||
// // newestTimestamp - timeQueue.top().timestamp < 200'000'000ull)) {
|
||||
|
||||
// // // TODO depending on how this is implemented, I may also need to
|
||||
// // // check that there is is enough bytes, in case it does partial
|
||||
// // // writes...
|
||||
// // if (epicsRingBytesGet(udpQueue, (char *)&ne,
|
||||
// // sizeof(NormalisedEvent))) {
|
||||
// // // we should restart this ioc at least every few years, as at ns
|
||||
// // // resolution with a uint64_t we will have an overflow after
|
||||
// // // around 4 years
|
||||
// // newestTimestamp = std::max(newestTimestamp, ne.timestamp);
|
||||
|
||||
// // ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0];
|
||||
|
||||
// // timeQueue.push(std::move(ne));
|
||||
// // }
|
||||
// // }
|
||||
|
||||
// // ne = timeQueue.top();
|
||||
// // timeQueue.pop();
|
||||
|
||||
// // status = getIntegerParam(this->P_Status, &currStatus);
|
||||
|
||||
// // udpQueueHighWaterMark =
|
||||
// // epicsRingBytesHighWaterMark(udpQueue) / sizeof(NormalisedEvent);
|
||||
|
||||
// // // if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) {
|
||||
// // // // Starting a new count
|
||||
|
||||
// // // // get current count configuration
|
||||
// // // getIntegerParam(this->P_CountPreset, &countPreset);
|
||||
// // // getIntegerParam(this->P_TimePreset, &timePreset);
|
||||
// // // getIntegerParam(this->P_MonitorChannel, &presetChannel);
|
||||
|
||||
// // // // reset status variables
|
||||
// // // startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||
// // // for (size_t i = 0; i < this->num_channels; ++i) {
|
||||
// // // counts[i] = 0;
|
||||
// // // }
|
||||
|
||||
// // // // reset pvs
|
||||
// // // // lock();
|
||||
// // // // for (size_t i = 0; i < num_channels; ++i) {
|
||||
// // // // setIntegerParam(P_Counts[i], counts[i]);
|
||||
// // // // }
|
||||
// // // // setIntegerParam(P_ElapsedTime, 0);
|
||||
// // // // callParamCallbacks();
|
||||
// // // // unlock();
|
||||
|
||||
// // // // TODO might consider throwing out current buffer as it is
|
||||
// // // // from before count started? then again, 0.2 ms or whatever is
|
||||
// // // // set above is quite a small preceeding amount of time, so
|
||||
// // // // maybe it doesn't matter
|
||||
// // // }
|
||||
|
||||
// // // prevStatus = currStatus;
|
||||
|
||||
// // //if (currStatus == STATUS_COUNTING) {
|
||||
// // startTimestamp = std::min(startTimestamp, ne.timestamp);
|
||||
// // currTimestamp = ne.timestamp;
|
||||
// // elapsedSeconds =
|
||||
// // 0 ? currTimestamp <= startTimestamp
|
||||
// // : ((double)(currTimestamp - startTimestamp)) / 1e9;
|
||||
|
||||
// // // is our count finished?
|
||||
// // // if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||
// // // (timePreset && elapsedSeconds >= timePreset)) {
|
||||
|
||||
// // // // filter out events that occured after the specified time
|
||||
// // // if (ne.timestamp - startTimestamp <= countPreset) {
|
||||
// // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
||||
// // // this->queueForKafka(std::move(ne));
|
||||
|
||||
// // // // add any remaining events with the same timestamp
|
||||
// // // // we could theoretically have a small overrun if the
|
||||
// // // // timestamps are identical on the monitor channel
|
||||
// // // while (!timeQueue.empty() &&
|
||||
// // // !timeQueue.top().timestamp == currTimestamp) {
|
||||
// // // ne = timeQueue.top();
|
||||
// // // timeQueue.pop();
|
||||
// // // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
||||
// // // this->queueForKafka(std::move(ne));
|
||||
// // // }
|
||||
// // // }
|
||||
|
||||
// // // countPreset = 0;
|
||||
// // // timePreset = 0;
|
||||
|
||||
// // // // lock();
|
||||
// // // for (size_t i = 0; i < num_channels; ++i) {
|
||||
// // // setIntegerParam(P_Counts[i], counts[i]);
|
||||
// // // }
|
||||
// // // setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
||||
// // // setIntegerParam(P_CountPreset, countPreset);
|
||||
// // // setIntegerParam(P_TimePreset, timePreset);
|
||||
// // // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark);
|
||||
// // // // callParamCallbacks();
|
||||
// // // setIntegerParam(P_Status, STATUS_IDLE);
|
||||
// // // // callParamCallbacks();
|
||||
// // // // unlock();
|
||||
|
||||
// // // epicsRingBytesResetHighWaterMark(udpQueue);
|
||||
|
||||
// // // } else {
|
||||
|
||||
// // counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
|
||||
// // this->queueForKafka(std::move(ne));
|
||||
|
||||
// // // lock();
|
||||
// // for (size_t i = 0; i < num_channels; ++i) {
|
||||
// // setIntegerParam(P_Counts[i], counts[i]);
|
||||
// // }
|
||||
// // setIntegerParam(P_ElapsedTime, elapsedSeconds);
|
||||
// // setIntegerParam(P_UdpQueueHighWaterMark, udpQueueHighWaterMark);
|
||||
// // // callParamCallbacks();
|
||||
// // // unlock();
|
||||
// // // }
|
||||
// // //}
|
||||
|
||||
// // // Careful changing any of these magic numbers until I clean this up
|
||||
// // // as you might end up calculating the wrong rate
|
||||
// // // epicsTimeStamp currentTime = epicsTime::getCurrent();
|
||||
// // // if (epicsTimeDiffInNS(¤tTime, &lastRateUpdate) >
|
||||
// // // minRateSamplePeriod) {
|
||||
// // // timeSpans[countDiffsPtr] =
|
||||
// // // epicsTimeDiffInNS(¤tTime, &lastRateUpdate);
|
||||
|
||||
// // // uint64_t totalTime = 0;
|
||||
// // // for (size_t i = 0; i <= rateAverageWindow; ++i) {
|
||||
// // // totalTime += timeSpans[i];
|
||||
// // // }
|
||||
|
||||
// // // lastRateUpdate = currentTime;
|
||||
|
||||
// // // for (size_t i = 0; i <= this->num_channels; ++i) {
|
||||
// // // countDiffs[i * rateAverageWindow + countDiffsPtr] =
|
||||
// // // countDiff[i];
|
||||
|
||||
// // // uint64_t cnt = 0;
|
||||
// // // for (size_t j = 0; j <= rateAverageWindow; ++j) {
|
||||
// // // cnt += countDiffs[i * rateAverageWindow + j];
|
||||
// // // }
|
||||
// // // rates[i] = cnt / (totalTime * 1e-9);
|
||||
|
||||
// // // countDiff[i] = 0;
|
||||
// // // }
|
||||
|
||||
// // // countDiffsPtr = (countDiffsPtr + 1) % rateAverageWindow;
|
||||
|
||||
// // // if (countDiffsPtr % 5 == 0) {
|
||||
// // // // lock();
|
||||
// // // for (size_t i = 0; i < num_channels; ++i) {
|
||||
// // // setIntegerParam(P_Rates[i], rates[i]);
|
||||
// // // }
|
||||
// // // // callParamCallbacks();
|
||||
// // // // unlock();
|
||||
// // // }
|
||||
// // // }
|
||||
// }
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
|
||||
|
||||
@@ -59,8 +59,8 @@ struct __attribute__((__packed__)) MonitorEvent {
|
||||
|
||||
struct __attribute__((__packed__)) NormalisedEvent {
|
||||
uint64_t timestamp;
|
||||
uint32_t pixelId : 24;
|
||||
uint8_t source;
|
||||
uint32_t pixelId;
|
||||
|
||||
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
|
||||
// pixelId)
|
||||
@@ -96,6 +96,9 @@ struct __attribute__((__packed__)) NormalisedEvent {
|
||||
#define P_RateString "RATE%d"
|
||||
#define P_ClearCountsString "C_%d"
|
||||
|
||||
#define P_UdpQueueHighWaterMarkString "UDP"
|
||||
#define P_SortedQueueHighWaterMarkString "SORT"
|
||||
|
||||
/*******************************************************************************
|
||||
* Stream Generator Coordinating Class
|
||||
*/
|
||||
@@ -110,9 +113,11 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
||||
const int kafkaMaxPacketSize);
|
||||
virtual ~asynStreamGeneratorDriver();
|
||||
|
||||
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
|
||||
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
|
||||
|
||||
void receiveUDP();
|
||||
void partialSortEvents();
|
||||
void processEvents();
|
||||
void produceMonitor();
|
||||
void produceDetector();
|
||||
@@ -133,6 +138,10 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
||||
int *P_Rates;
|
||||
int *P_ClearCounts;
|
||||
|
||||
// System Status Parameter Identifying IDs
|
||||
int P_UdpQueueHighWaterMark;
|
||||
int P_SortedQueueHighWaterMark;
|
||||
|
||||
private:
|
||||
asynUser *pasynUDPUser;
|
||||
epicsEventId pausedEventId;
|
||||
@@ -142,6 +151,7 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
||||
const int kafkaMaxPacketSize;
|
||||
|
||||
epicsRingBytesId udpQueue;
|
||||
epicsRingBytesId sortedQueue;
|
||||
|
||||
epicsRingBytesId monitorQueue;
|
||||
rd_kafka_t *monitorProducer;
|
||||
|
||||
Reference in New Issue
Block a user