Merge pull request 'Fixes some logic bugs and adds rate calculation' (#2) from bugfixing into main
Reviewed-on: #2
This commit was merged in pull request #2.
This commit is contained in:
+3
-2
@@ -67,9 +67,10 @@ record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Rate of DAQ CH$(CHANNEL)")
|
||||
field(EGU, "cts/sec")
|
||||
field(DTYP, "asynInt32")
|
||||
field(DTYP, "asynFloat64")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
|
||||
field(SCAN, ".2 second")
|
||||
field(SCAN, ".5 second")
|
||||
field(PREC, 2)
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
@@ -27,3 +27,47 @@ record(bi, "$(INSTR)$(NAME):Enable_RBV")
|
||||
field(ONAM, "ON")
|
||||
field(SCAN, ".5 second")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
||||
{
|
||||
field(DESC, "UDP Packets Missed")
|
||||
field(EGU, "Events")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||
{
|
||||
field(DESC, "UDP Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):NORMALISED_WATERMARK")
|
||||
{
|
||||
field(DESC, "Normalised Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) NORM")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||
{
|
||||
field(DESC, "Sort Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
+1
-37
@@ -52,7 +52,7 @@ record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
|
||||
record(fanout, "$(INSTR)$(NAME):READALL")
|
||||
{
|
||||
field(SELM, "All")
|
||||
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP")
|
||||
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||
field(LNK1, "$(INSTR)$(NAME):M1")
|
||||
field(LNK2, "$(INSTR)$(NAME):M2")
|
||||
field(LNK3, "$(INSTR)$(NAME):M3")
|
||||
@@ -236,39 +236,3 @@ record(ai, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||
field(PINI, "YES")
|
||||
# field(FLNK, "$(INSTR)$(NAME):ETO")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Stream Generator Status PVs
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
||||
{
|
||||
field(DESC, "UDP Packets Missed")
|
||||
field(EGU, "Events")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||
{
|
||||
field(DESC, "UDP Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||
{
|
||||
field(DESC, "Partial Sort Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
+7
-5
@@ -71,7 +71,7 @@ while True:
|
||||
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
|
||||
buffer_ids[header[11]] = header[6], header[7]
|
||||
|
||||
tosend = list(header)
|
||||
tosend = []
|
||||
|
||||
if is_monitor > 3:
|
||||
|
||||
@@ -89,7 +89,8 @@ while True:
|
||||
d[1] = (event_timestamp >> 8) & 0xff
|
||||
d[2] = (event_timestamp >> 16) & 0x07
|
||||
|
||||
tosend += d
|
||||
# completely reversed sorting
|
||||
tosend = d + tosend
|
||||
|
||||
else:
|
||||
|
||||
@@ -110,9 +111,10 @@ while True:
|
||||
d[1] = (event_timestamp >> 8) & 0xff
|
||||
d[2] |= (event_timestamp >> 16) & 0x07
|
||||
|
||||
tosend += d
|
||||
# completely reversed sorting
|
||||
tosend = d + tosend
|
||||
|
||||
sock.sendto(bytes(tosend), ('127.0.0.1', 54321))
|
||||
sock.sendto(bytes(header + tosend), ('127.0.0.1', 54321))
|
||||
mv = memoryview(bytes(header)).cast('H')
|
||||
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
|
||||
# time.sleep(.01)
|
||||
# time.sleep(.1)
|
||||
|
||||
@@ -68,10 +68,10 @@ static void udpNormaliserTask(void *drvPvt) {
|
||||
pSGD->normaliseUDP();
|
||||
}
|
||||
|
||||
static void sortTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->partialSortEvents();
|
||||
}
|
||||
// static void sortTask(void *drvPvt) {
|
||||
// asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
// pSGD->partialSortEvents();
|
||||
// }
|
||||
|
||||
static void daqTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
@@ -191,7 +191,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
|
||||
memset(pv_name_buffer, 0, 100);
|
||||
epicsSnprintf(pv_name_buffer, 100, P_RateString, i + 1);
|
||||
status = createInt32Param(status, pv_name_buffer, P_Rates + i);
|
||||
status = createFloat64Param(status, pv_name_buffer, P_Rates + i);
|
||||
|
||||
memset(pv_name_buffer, 0, 100);
|
||||
epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i + 1);
|
||||
@@ -201,6 +201,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped);
|
||||
status = createInt32Param(status, P_UdpQueueHighWaterMarkString,
|
||||
&P_UdpQueueHighWaterMark);
|
||||
status = createInt32Param(status, P_NormalisedQueueHighWaterMarkString,
|
||||
&P_NormalisedQueueHighWaterMark);
|
||||
status = createInt32Param(status, P_SortedQueueHighWaterMarkString,
|
||||
&P_SortedQueueHighWaterMark);
|
||||
|
||||
@@ -267,7 +269,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"sinqDAQ",
|
||||
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
epicsThreadGetStackSize(epicsThreadStackBig),
|
||||
(EPICSTHREADFUNC)::daqTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
@@ -278,15 +280,15 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
/* Create the thread that orders packets of in preparation for our sinqDAQ
|
||||
* stand-in
|
||||
*/
|
||||
status = (asynStatus)(epicsThreadCreate(
|
||||
"partialSort", epicsThreadPriorityMedium,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::sortTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
// status = (asynStatus)(epicsThreadCreate(
|
||||
// "partialSort", epicsThreadPriorityMedium,
|
||||
// epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
// (EPICSTHREADFUNC)::sortTask, this) == NULL);
|
||||
// if (status) {
|
||||
// epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
// driverName, functionName, status);
|
||||
// exit(1);
|
||||
// }
|
||||
|
||||
/* Create the thread normalises the events
|
||||
*/
|
||||
@@ -346,15 +348,26 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
|
||||
|
||||
if (function == P_UdpQueueHighWaterMark) {
|
||||
const double toPercent = 100. / (243. * udpQueueSize);
|
||||
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) /
|
||||
sizeof(NormalisedEvent) * toPercent);
|
||||
*value =
|
||||
(epicsInt32)(((double)epicsRingBytesHighWaterMark(this->udpQueue)) /
|
||||
sizeof(NormalisedEvent) * toPercent);
|
||||
// Aparently resetting the watermark causes problems...
|
||||
// at least concurrently :D
|
||||
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
||||
return asynSuccess;
|
||||
|
||||
} else if (function == P_NormalisedQueueHighWaterMark) {
|
||||
const double toPercent = 100. / (243. * udpQueueSize);
|
||||
*value = (epicsInt32)(((double)epicsRingBytesHighWaterMark(
|
||||
this->normalisedQueue)) /
|
||||
sizeof(NormalisedEvent) * toPercent);
|
||||
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
||||
return asynSuccess;
|
||||
|
||||
} else if (function == P_SortedQueueHighWaterMark) {
|
||||
const double toPercent = 100. / (243. * udpQueueSize);
|
||||
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) /
|
||||
*value = (epicsInt32)(((double)epicsRingBytesHighWaterMark(
|
||||
this->sortedQueue)) /
|
||||
sizeof(NormalisedEvent) * toPercent);
|
||||
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
||||
return asynSuccess;
|
||||
@@ -498,6 +511,8 @@ void asynStreamGeneratorDriver::receiveUDP() {
|
||||
const std::size_t minDataBufferHeaderLength = 42;
|
||||
const std::size_t minCmdBufferHeaderLength = 20;
|
||||
|
||||
// For some reason the data buffers also have bit 15 set....
|
||||
// so they also appear as command buffers
|
||||
if (received >= minDataBufferHeaderLength &&
|
||||
received == bufferLength * 2) {
|
||||
|
||||
@@ -607,7 +622,10 @@ void asynStreamGeneratorDriver::normaliseUDP() {
|
||||
resultBuffer[i] = ne;
|
||||
}
|
||||
|
||||
epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
|
||||
// epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
|
||||
// total_events * sizeof(NormalisedEvent));
|
||||
|
||||
epicsRingBytesPut(this->sortedQueue, (char *)resultBuffer,
|
||||
total_events * sizeof(NormalisedEvent));
|
||||
|
||||
} else {
|
||||
@@ -626,47 +644,47 @@ inline int eventsInQueue(epicsRingBytesId id) {
|
||||
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::partialSortEvents() {
|
||||
|
||||
// const char functionName[]{"partialSortEvents"};
|
||||
|
||||
// x * number of ids * max events in packet
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
NormalisedEvent events[bufferedEvents];
|
||||
|
||||
int queuedEvents = 0;
|
||||
epicsTimeStamp lastSort = epicsTime::getCurrent();
|
||||
epicsTimeStamp currentTime = lastSort;
|
||||
|
||||
while (true) {
|
||||
|
||||
queuedEvents =
|
||||
eventsInQueue(this->normalisedQueue); // in case we can't wait
|
||||
lastSort = epicsTime::getCurrent();
|
||||
currentTime = lastSort;
|
||||
|
||||
// wait for mininmum packet frequency or enough packets to ensure we
|
||||
// could potentially have at least 1 packet per mcpdid
|
||||
while (queuedEvents < bufferedEvents &&
|
||||
epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ll) {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
currentTime = epicsTime::getCurrent();
|
||||
queuedEvents = eventsInQueue(this->normalisedQueue);
|
||||
}
|
||||
|
||||
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||
|
||||
if (queuedEvents) {
|
||||
epicsRingBytesGet(this->normalisedQueue, (char *)events,
|
||||
queuedEvents * sizeof(NormalisedEvent));
|
||||
|
||||
std::sort(events, events + queuedEvents, oldestEventsFirst);
|
||||
|
||||
epicsRingBytesPut(this->sortedQueue, (char *)events,
|
||||
queuedEvents * sizeof(NormalisedEvent));
|
||||
}
|
||||
}
|
||||
}
|
||||
// void asynStreamGeneratorDriver::partialSortEvents() {
|
||||
//
|
||||
// // const char functionName[]{"partialSortEvents"};
|
||||
//
|
||||
// // x * number of ids * max events in packet
|
||||
// int bufferedEvents = 5 * 10 * 243;
|
||||
// NormalisedEvent events[bufferedEvents];
|
||||
//
|
||||
// int queuedEvents = 0;
|
||||
// epicsTimeStamp lastSort = epicsTime::getCurrent();
|
||||
// epicsTimeStamp currentTime = lastSort;
|
||||
//
|
||||
// while (true) {
|
||||
//
|
||||
// queuedEvents =
|
||||
// eventsInQueue(this->normalisedQueue); // in case we can't wait
|
||||
// lastSort = epicsTime::getCurrent();
|
||||
// currentTime = lastSort;
|
||||
//
|
||||
// // wait for mininmum packet frequency or enough packets to ensure we
|
||||
// // could potentially have at least 1 packet per mcpdid
|
||||
// while (queuedEvents < bufferedEvents &&
|
||||
// epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ll) {
|
||||
// epicsThreadSleep(0.0001); // seconds
|
||||
// currentTime = epicsTime::getCurrent();
|
||||
// queuedEvents = eventsInQueue(this->normalisedQueue);
|
||||
// }
|
||||
//
|
||||
// queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||
//
|
||||
// if (queuedEvents) {
|
||||
// epicsRingBytesGet(this->normalisedQueue, (char *)events,
|
||||
// queuedEvents * sizeof(NormalisedEvent));
|
||||
//
|
||||
// std::sort(events, events + queuedEvents, oldestEventsFirst);
|
||||
//
|
||||
// epicsRingBytesPut(this->sortedQueue, (char *)events,
|
||||
// queuedEvents * sizeof(NormalisedEvent));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) {
|
||||
if (this->kafkaEnabled) {
|
||||
@@ -684,9 +702,14 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
const char functionName[]{"processEvents"};
|
||||
|
||||
// x * number of ids * max events in packet * event size
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
const int bufferedEvents = 20 * 10 * 243;
|
||||
// we need a little extra space for merge sorting in
|
||||
int extraBufferedEvents = 1 * 10 * 243;
|
||||
const int extraBufferedEvents = 10 * 10 * 243;
|
||||
// so we have 5 and want to keep 1/5 for the next window
|
||||
// which is why toProcess has 4/5 below
|
||||
// or now
|
||||
// we have 10 and want to keep 4/10 for the next window
|
||||
// so toProcess is 6/10 below
|
||||
|
||||
// we have two buffers. We alternate between reading data into one of them,
|
||||
// and then merge sorting into the other
|
||||
@@ -707,11 +730,27 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
double elapsedSeconds = 0;
|
||||
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
std::size_t ratePtr = 0;
|
||||
const uint8_t rateWindowSize = 10;
|
||||
uint32_t ratesWindow[this->num_channels * rateWindowSize];
|
||||
uint64_t ratesStartTimes[rateWindowSize];
|
||||
uint64_t ratesStopTimes[rateWindowSize];
|
||||
|
||||
for (std::size_t i = 0; i < this->num_channels * rateWindowSize; ++i) {
|
||||
ratesWindow[i] = 0;
|
||||
}
|
||||
|
||||
for (std::size_t i = 0; i < this->num_channels; ++i) {
|
||||
ratesStartTimes[i] = std::numeric_limits<uint64_t>::max();
|
||||
ratesStopTimes[i] = 0;
|
||||
}
|
||||
|
||||
epicsInt32 currStatus = STATUS_IDLE;
|
||||
epicsInt32 prevStatus = STATUS_IDLE;
|
||||
epicsInt32 countPreset;
|
||||
epicsInt32 timePreset;
|
||||
epicsInt32 presetChannel;
|
||||
std::size_t b_cnt = 0;
|
||||
|
||||
while (true) {
|
||||
|
||||
@@ -749,8 +788,21 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr,
|
||||
queuedEvents * sizeof(NormalisedEvent));
|
||||
|
||||
std::sort(newStartPtr, newStartPtr + queuedEvents, oldestEventsFirst);
|
||||
|
||||
// std::size_t toProcess =
|
||||
// eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
||||
|
||||
std::size_t toProcess =
|
||||
eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
||||
eventsBLastEnd - eventsBLastStart + queuedEvents * 10 / 20;
|
||||
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "newStartPtr %" PRIu64 " queuedEvents %" PRIu64 " toProcess %" PRIu64 "\n",
|
||||
// newStartPtr - eventsA, queuedEvents, toProcess);
|
||||
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "eventsBLastStart %" PRIu64 " eventsBLastEnd %" PRIu64"\n",
|
||||
// eventsBLastStart - eventsB, eventsBLastEnd - eventsB);
|
||||
|
||||
// TODO could also consider an in-place merge
|
||||
eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents,
|
||||
@@ -759,6 +811,7 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
|
||||
eventsBLastStart = eventsA + toProcess;
|
||||
|
||||
|
||||
// TODO I haven't really taken care of the case that there are no events
|
||||
|
||||
if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) {
|
||||
@@ -781,15 +834,51 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
"%s:%s: starting count: (%d, %d, %d, %" PRIu64 ")\n",
|
||||
driverName, functionName, countPreset, timePreset,
|
||||
presetChannel, startTimestamp);
|
||||
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "%s:%s: starting count: %" PRIu64 " %" PRIu64 " %" PRIu64 "\n",
|
||||
// driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp);
|
||||
}
|
||||
|
||||
//if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp)
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n",
|
||||
// driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
|
||||
|
||||
// if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) {
|
||||
// // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// // "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n",
|
||||
// // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
|
||||
//
|
||||
//
|
||||
//
|
||||
// for (size_t j = 1; j < toProcess; ++j) {
|
||||
// if (eventsA[j-1].timestamp > eventsA[j].timestamp) {
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "%s:%s: not sorted %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 "\n",
|
||||
// driverName, functionName, b_cnt, j, eventsA[j-1].timestamp, eventsA[j].timestamp);
|
||||
// }
|
||||
// }
|
||||
// ++b_cnt;
|
||||
|
||||
// std::sort(eventsA, eventsA + toProcess, oldestEventsFirst);
|
||||
// }
|
||||
|
||||
// if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst ))
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n",
|
||||
// driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
|
||||
|
||||
if (currStatus == STATUS_COUNTING) {
|
||||
|
||||
for (std::size_t i = 0; i < toProcess; ++i) {
|
||||
|
||||
counts[eventsA[i].source == 0 ? eventsA[i].pixelId
|
||||
: this->num_channels - 1] += 1;
|
||||
elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9;
|
||||
const uint8_t channelId = eventsA[i].source == 0 ? eventsA[i].pixelId
|
||||
: this->num_channels - 1;
|
||||
|
||||
counts[channelId] += 1;
|
||||
elapsedSeconds =
|
||||
((double)(eventsA[i].timestamp - startTimestamp)) / 1e9;
|
||||
|
||||
const bool reachedCount =
|
||||
countPreset && counts[presetChannel] >= countPreset;
|
||||
@@ -798,10 +887,13 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
|
||||
if (reachedCount || reachedTime) {
|
||||
|
||||
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: reached preset: (%lld, %f)\n", driverName,
|
||||
functionName, counts[presetChannel],
|
||||
elapsedSeconds);
|
||||
asynPrint(
|
||||
pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: reached preset: (%d, %d) (%lld, %f, %" PRIu64
|
||||
")\n",
|
||||
driverName, functionName, reachedCount, reachedTime,
|
||||
counts[presetChannel], elapsedSeconds,
|
||||
eventsA[i].timestamp);
|
||||
|
||||
// TODO should really check there an no more events with the
|
||||
// same final timestamp
|
||||
@@ -827,12 +919,73 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
(timePreset && elapsedSeconds >= (double)timePreset)) {
|
||||
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: finished count\n", driverName, functionName);
|
||||
setIntegerParam(this->P_Status, STATUS_IDLE);
|
||||
currStatus = STATUS_IDLE;
|
||||
setIntegerParam(this->P_Status, currStatus);
|
||||
setIntegerParam(this->P_CountPreset, 0);
|
||||
setIntegerParam(this->P_TimePreset, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// TODO this was just done quickly and is probably not totally correct
|
||||
// Determing the rates on each channel
|
||||
uint32_t *ratesWindowPtr = ratesWindow + this->num_channels * ratePtr;
|
||||
|
||||
// Clear Bin
|
||||
for (std::size_t i = 0; i < this->num_channels; ++i) {
|
||||
ratesWindowPtr[i] = 0;
|
||||
}
|
||||
|
||||
// Sum Counts in Bin
|
||||
for (std::size_t i = 0; i < toProcess; ++i) {
|
||||
const uint8_t channelId = eventsA[i].source == 0 ? eventsA[i].pixelId
|
||||
: this->num_channels - 1;
|
||||
|
||||
// I think this should be the better way regarding speed
|
||||
// otherwise we could have
|
||||
// channelId * rateWindowSize + ratePtr
|
||||
// to simplify the average
|
||||
ratesWindowPtr[channelId] += 1;
|
||||
}
|
||||
|
||||
if (toProcess) {
|
||||
ratesStartTimes[ratePtr] = eventsA[0].timestamp;
|
||||
ratesStopTimes[ratePtr] = eventsA[toProcess-1].timestamp;
|
||||
}
|
||||
// else {
|
||||
// ratesStartTimes[ratePtr] = std::numeric_limits<uint64_t>::max();
|
||||
// ratesStopTimes[ratePtr] = 0;
|
||||
// }
|
||||
|
||||
uint64_t minTime = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t maxTime = 0;
|
||||
for (std::size_t i = 0; i < rateWindowSize; ++i) {
|
||||
minTime = std::min(minTime, ratesStartTimes[i]);
|
||||
maxTime = std::max(maxTime, ratesStopTimes[i]);
|
||||
}
|
||||
const double timeWindow = maxTime <= minTime ? 0 : (maxTime - minTime) * 1e-9;
|
||||
|
||||
for (std::size_t i = 0; i < this->num_channels; ++i) {
|
||||
double rate = 0.;
|
||||
for (std::size_t j = 0; j < rateWindowSize; ++j) {
|
||||
rate += ratesWindow[i + this->num_channels * j];
|
||||
}
|
||||
setDoubleParam(
|
||||
this->P_Rates[i],
|
||||
rate / timeWindow
|
||||
);
|
||||
|
||||
// if (i == 0)
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "%s:%s: window %f (%d) counts %f\n", driverName, functionName, timeWindow, timeWindow > 0., rate);
|
||||
}
|
||||
|
||||
|
||||
ratePtr = (ratePtr + 1) % rateWindowSize;
|
||||
|
||||
|
||||
prevStatus = currStatus;
|
||||
|
||||
std::swap(eventsA, eventsB);
|
||||
|
||||
@@ -141,6 +141,7 @@ constexpr static char P_ClearCountsString[]{"C_%" PRIu64};
|
||||
|
||||
constexpr static char P_UdpDroppedString[]{"DROP"};
|
||||
constexpr static char P_UdpQueueHighWaterMarkString[]{"UDP"};
|
||||
constexpr static char P_NormalisedQueueHighWaterMarkString[]{"NORM"};
|
||||
constexpr static char P_SortedQueueHighWaterMarkString[]{"SORT"};
|
||||
|
||||
/*******************************************************************************
|
||||
@@ -188,6 +189,7 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
||||
// System Status Parameter Identifying IDs
|
||||
int P_UdpDropped;
|
||||
int P_UdpQueueHighWaterMark;
|
||||
int P_NormalisedQueueHighWaterMark;
|
||||
int P_SortedQueueHighWaterMark;
|
||||
|
||||
private:
|
||||
|
||||
Reference in New Issue
Block a user