heck
Some checks failed
Test And Build / Lint (push) Failing after 2s
Test And Build / Build (push) Failing after 2s

This commit is contained in:
2025-11-19 18:33:26 +01:00
parent fdbb8f5061
commit f169076f65
2 changed files with 108 additions and 68 deletions

View File

@@ -52,7 +52,7 @@ record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
record(fanout, "$(INSTR)$(NAME):READALL") record(fanout, "$(INSTR)$(NAME):READALL")
{ {
field(SELM, "All") field(SELM, "All")
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP") field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME")
field(LNK1, "$(INSTR)$(NAME):M1") field(LNK1, "$(INSTR)$(NAME):M1")
field(LNK2, "$(INSTR)$(NAME):M2") field(LNK2, "$(INSTR)$(NAME):M2")
field(LNK3, "$(INSTR)$(NAME):M3") field(LNK3, "$(INSTR)$(NAME):M3")

View File

@@ -68,10 +68,10 @@ static void udpNormaliserTask(void *drvPvt) {
pSGD->normaliseUDP(); pSGD->normaliseUDP();
} }
static void sortTask(void *drvPvt) { // static void sortTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; // asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->partialSortEvents(); // pSGD->partialSortEvents();
} // }
static void daqTask(void *drvPvt) { static void daqTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
@@ -269,7 +269,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
(asynStatus)(epicsThreadCreate( (asynStatus)(epicsThreadCreate(
"sinqDAQ", "sinqDAQ",
epicsThreadPriorityMedium, // epicsThreadPriorityMax, epicsThreadPriorityMedium, // epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium), epicsThreadGetStackSize(epicsThreadStackBig),
(EPICSTHREADFUNC)::daqTask, this) == NULL); (EPICSTHREADFUNC)::daqTask, this) == NULL);
if (status) { if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
@@ -280,15 +280,15 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
/* Create the thread that orders packets of in preparation for our sinqDAQ /* Create the thread that orders packets of in preparation for our sinqDAQ
* stand-in * stand-in
*/ */
status = (asynStatus)(epicsThreadCreate( // status = (asynStatus)(epicsThreadCreate(
"partialSort", epicsThreadPriorityMedium, // "partialSort", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium), // epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::sortTask, this) == NULL); // (EPICSTHREADFUNC)::sortTask, this) == NULL);
if (status) { // if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", // epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status); // driverName, functionName, status);
exit(1); // exit(1);
} // }
/* Create the thread normalises the events /* Create the thread normalises the events
*/ */
@@ -620,7 +620,10 @@ void asynStreamGeneratorDriver::normaliseUDP() {
resultBuffer[i] = ne; resultBuffer[i] = ne;
} }
epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer, // epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
// total_events * sizeof(NormalisedEvent));
epicsRingBytesPut(this->sortedQueue, (char *)resultBuffer,
total_events * sizeof(NormalisedEvent)); total_events * sizeof(NormalisedEvent));
} else { } else {
@@ -639,47 +642,47 @@ inline int eventsInQueue(epicsRingBytesId id) {
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent); return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
} }
void asynStreamGeneratorDriver::partialSortEvents() { // void asynStreamGeneratorDriver::partialSortEvents() {
//
// const char functionName[]{"partialSortEvents"}; // // const char functionName[]{"partialSortEvents"};
//
// x * number of ids * max events in packet // // x * number of ids * max events in packet
int bufferedEvents = 5 * 10 * 243; // int bufferedEvents = 5 * 10 * 243;
NormalisedEvent events[bufferedEvents]; // NormalisedEvent events[bufferedEvents];
//
int queuedEvents = 0; // int queuedEvents = 0;
epicsTimeStamp lastSort = epicsTime::getCurrent(); // epicsTimeStamp lastSort = epicsTime::getCurrent();
epicsTimeStamp currentTime = lastSort; // epicsTimeStamp currentTime = lastSort;
//
while (true) { // while (true) {
//
queuedEvents = // queuedEvents =
eventsInQueue(this->normalisedQueue); // in case we can't wait // eventsInQueue(this->normalisedQueue); // in case we can't wait
lastSort = epicsTime::getCurrent(); // lastSort = epicsTime::getCurrent();
currentTime = lastSort; // currentTime = lastSort;
//
// wait for mininmum packet frequency or enough packets to ensure we // // wait for mininmum packet frequency or enough packets to ensure we
// could potentially have at least 1 packet per mcpdid // // could potentially have at least 1 packet per mcpdid
while (queuedEvents < bufferedEvents && // while (queuedEvents < bufferedEvents &&
epicsTimeDiffInNS(&currentTime, &lastSort) < 250'000'000ll) { // epicsTimeDiffInNS(&currentTime, &lastSort) < 250'000'000ll) {
epicsThreadSleep(0.0001); // seconds // epicsThreadSleep(0.0001); // seconds
currentTime = epicsTime::getCurrent(); // currentTime = epicsTime::getCurrent();
queuedEvents = eventsInQueue(this->normalisedQueue); // queuedEvents = eventsInQueue(this->normalisedQueue);
} // }
//
queuedEvents = std::min(queuedEvents, bufferedEvents); // queuedEvents = std::min(queuedEvents, bufferedEvents);
//
if (queuedEvents) { // if (queuedEvents) {
epicsRingBytesGet(this->normalisedQueue, (char *)events, // epicsRingBytesGet(this->normalisedQueue, (char *)events,
queuedEvents * sizeof(NormalisedEvent)); // queuedEvents * sizeof(NormalisedEvent));
//
std::sort(events, events + queuedEvents, oldestEventsFirst); // std::sort(events, events + queuedEvents, oldestEventsFirst);
//
epicsRingBytesPut(this->sortedQueue, (char *)events, // epicsRingBytesPut(this->sortedQueue, (char *)events,
queuedEvents * sizeof(NormalisedEvent)); // queuedEvents * sizeof(NormalisedEvent));
} // }
} // }
} // }
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) {
if (this->kafkaEnabled) { if (this->kafkaEnabled) {
@@ -697,9 +700,14 @@ void asynStreamGeneratorDriver::processEvents() {
const char functionName[]{"processEvents"}; const char functionName[]{"processEvents"};
// x * number of ids * max events in packet * event size // x * number of ids * max events in packet * event size
int bufferedEvents = 5 * 10 * 243; const int bufferedEvents = 20 * 10 * 243;
// we need a little extra space for merge sorting in // we need a little extra space for merge sorting in
int extraBufferedEvents = 1 * 10 * 243; const int extraBufferedEvents = 10 * 10 * 243;
// so we have 5 and want to keep 1/5 for the next window
// which is why toProcess has 4/5 below
// or now
// we have 10 and want to keep 4/10 for the next window
// so toProcess is 6/10 below
// we have two buffers. We alternate between reading data into one of them, // we have two buffers. We alternate between reading data into one of them,
// and then merge sorting into the other // and then merge sorting into the other
@@ -725,6 +733,7 @@ void asynStreamGeneratorDriver::processEvents() {
epicsInt32 countPreset; epicsInt32 countPreset;
epicsInt32 timePreset; epicsInt32 timePreset;
epicsInt32 presetChannel; epicsInt32 presetChannel;
std::size_t b_cnt = 0;
while (true) { while (true) {
@@ -762,8 +771,19 @@ void asynStreamGeneratorDriver::processEvents() {
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr,
queuedEvents * sizeof(NormalisedEvent)); queuedEvents * sizeof(NormalisedEvent));
// std::size_t toProcess =
// eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
std::size_t toProcess = std::size_t toProcess =
eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; eventsBLastEnd - eventsBLastStart + queuedEvents * 10 / 20;
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"newStartPtr %" PRIu64 " queuedEvents %" PRIu64 " toProcess %" PRIu64 "\n",
newStartPtr - eventsA, queuedEvents, toProcess);
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"eventsBLastStart %" PRIu64 " eventsBLastEnd %" PRIu64"\n",
eventsBLastStart - eventsB, eventsBLastEnd - eventsB);
// TODO could also consider an in-place merge // TODO could also consider an in-place merge
eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents, eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents,
@@ -772,6 +792,7 @@ void asynStreamGeneratorDriver::processEvents() {
eventsBLastStart = eventsA + toProcess; eventsBLastStart = eventsA + toProcess;
// TODO I haven't really taken care of the case that there are no events // TODO I haven't really taken care of the case that there are no events
if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) { if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) {
@@ -800,15 +821,34 @@ void asynStreamGeneratorDriver::processEvents() {
driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp); driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp);
} }
if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp) //if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp)
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n", // "%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n",
driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n", // "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n",
driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
for (size_t j = 1; j < toProcess; ++j) {
if (eventsA[j-1].timestamp > eventsA[j].timestamp) {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: not sorted %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 "\n",
driverName, functionName, b_cnt, j, eventsA[j-1].timestamp, eventsA[j].timestamp);
}
}
++b_cnt;
std::sort(eventsA, eventsA + toProcess, oldestEventsFirst);
}
// if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst ))
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n",
// driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
if (currStatus == STATUS_COUNTING) { if (currStatus == STATUS_COUNTING) {