diff --git a/db/daq_common.db b/db/daq_common.db index 6fcbe9a..aae1767 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -52,7 +52,7 @@ record(mbbi, "$(INSTR)$(NAME):RAW-STATUS") record(fanout, "$(INSTR)$(NAME):READALL") { field(SELM, "All") - field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP") + field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME") field(LNK1, "$(INSTR)$(NAME):M1") field(LNK2, "$(INSTR)$(NAME):M2") field(LNK3, "$(INSTR)$(NAME):M3") diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 25da2ed..cb9a24a 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -68,10 +68,10 @@ static void udpNormaliserTask(void *drvPvt) { pSGD->normaliseUDP(); } -static void sortTask(void *drvPvt) { - asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; - pSGD->partialSortEvents(); -} +// static void sortTask(void *drvPvt) { +// asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; +// pSGD->partialSortEvents(); +// } static void daqTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; @@ -269,7 +269,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( (asynStatus)(epicsThreadCreate( "sinqDAQ", epicsThreadPriorityMedium, // epicsThreadPriorityMax, - epicsThreadGetStackSize(epicsThreadStackMedium), + epicsThreadGetStackSize(epicsThreadStackBig), (EPICSTHREADFUNC)::daqTask, this) == NULL); if (status) { epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", @@ -280,15 +280,15 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( /* Create the thread that orders packets of in preparation for our sinqDAQ * stand-in */ - status = (asynStatus)(epicsThreadCreate( - "partialSort", epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackMedium), - (EPICSTHREADFUNC)::sortTask, this) == NULL); - if (status) { - epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", - driverName, functionName, status); - exit(1); - } + // status = (asynStatus)(epicsThreadCreate( + // "partialSort", epicsThreadPriorityMedium, + // epicsThreadGetStackSize(epicsThreadStackMedium), + // (EPICSTHREADFUNC)::sortTask, this) == NULL); + // if (status) { + // epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + // driverName, functionName, status); + // exit(1); + // } /* Create the thread normalises the events */ @@ -620,7 +620,10 @@ void asynStreamGeneratorDriver::normaliseUDP() { resultBuffer[i] = ne; } - epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer, + // epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer, + // total_events * sizeof(NormalisedEvent)); + + epicsRingBytesPut(this->sortedQueue, (char *)resultBuffer, total_events * sizeof(NormalisedEvent)); } else { @@ -639,47 +642,47 @@ inline int eventsInQueue(epicsRingBytesId id) { return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent); } -void asynStreamGeneratorDriver::partialSortEvents() { - - // const char functionName[]{"partialSortEvents"}; - - // x * number of ids * max events in packet - int bufferedEvents = 5 * 10 * 243; - NormalisedEvent events[bufferedEvents]; - - int queuedEvents = 0; - epicsTimeStamp lastSort = epicsTime::getCurrent(); - epicsTimeStamp currentTime = lastSort; - - while (true) { - - queuedEvents = - eventsInQueue(this->normalisedQueue); // in case we can't wait - lastSort = epicsTime::getCurrent(); - currentTime = lastSort; - - // wait for mininmum packet frequency or enough packets to ensure we - // could potentially have at least 1 packet per mcpdid - while (queuedEvents < bufferedEvents && - epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ll) { - epicsThreadSleep(0.0001); // seconds - currentTime = epicsTime::getCurrent(); - queuedEvents = eventsInQueue(this->normalisedQueue); - } - - queuedEvents = std::min(queuedEvents, bufferedEvents); - - if (queuedEvents) { - epicsRingBytesGet(this->normalisedQueue, (char *)events, - queuedEvents * sizeof(NormalisedEvent)); - - std::sort(events, events + queuedEvents, oldestEventsFirst); - - epicsRingBytesPut(this->sortedQueue, (char *)events, - queuedEvents * sizeof(NormalisedEvent)); - } - } -} +// void asynStreamGeneratorDriver::partialSortEvents() { +// +// // const char functionName[]{"partialSortEvents"}; +// +// // x * number of ids * max events in packet +// int bufferedEvents = 5 * 10 * 243; +// NormalisedEvent events[bufferedEvents]; +// +// int queuedEvents = 0; +// epicsTimeStamp lastSort = epicsTime::getCurrent(); +// epicsTimeStamp currentTime = lastSort; +// +// while (true) { +// +// queuedEvents = +// eventsInQueue(this->normalisedQueue); // in case we can't wait +// lastSort = epicsTime::getCurrent(); +// currentTime = lastSort; +// +// // wait for mininmum packet frequency or enough packets to ensure we +// // could potentially have at least 1 packet per mcpdid +// while (queuedEvents < bufferedEvents && +// epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ll) { +// epicsThreadSleep(0.0001); // seconds +// currentTime = epicsTime::getCurrent(); +// queuedEvents = eventsInQueue(this->normalisedQueue); +// } +// +// queuedEvents = std::min(queuedEvents, bufferedEvents); +// +// if (queuedEvents) { +// epicsRingBytesGet(this->normalisedQueue, (char *)events, +// queuedEvents * sizeof(NormalisedEvent)); +// +// std::sort(events, events + queuedEvents, oldestEventsFirst); +// +// epicsRingBytesPut(this->sortedQueue, (char *)events, +// queuedEvents * sizeof(NormalisedEvent)); +// } +// } +// } inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { if (this->kafkaEnabled) { @@ -697,9 +700,14 @@ void asynStreamGeneratorDriver::processEvents() { const char functionName[]{"processEvents"}; // x * number of ids * max events in packet * event size - int bufferedEvents = 5 * 10 * 243; + const int bufferedEvents = 20 * 10 * 243; // we need a little extra space for merge sorting in - int extraBufferedEvents = 1 * 10 * 243; + const int extraBufferedEvents = 10 * 10 * 243; + // so we have 5 and want to keep 1/5 for the next window + // which is why toProcess has 4/5 below + // or now + // we have 10 and want to keep 4/10 for the next window + // so toProcess is 6/10 below // we have two buffers. We alternate between reading data into one of them, // and then merge sorting into the other @@ -725,6 +733,7 @@ void asynStreamGeneratorDriver::processEvents() { epicsInt32 countPreset; epicsInt32 timePreset; epicsInt32 presetChannel; + std::size_t b_cnt = 0; while (true) { @@ -762,8 +771,19 @@ void asynStreamGeneratorDriver::processEvents() { epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent)); + // std::size_t toProcess = + // eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; + std::size_t toProcess = - eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; + eventsBLastEnd - eventsBLastStart + queuedEvents * 10 / 20; + + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "newStartPtr %" PRIu64 " queuedEvents %" PRIu64 " toProcess %" PRIu64 "\n", + newStartPtr - eventsA, queuedEvents, toProcess); + + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "eventsBLastStart %" PRIu64 " eventsBLastEnd %" PRIu64"\n", + eventsBLastStart - eventsB, eventsBLastEnd - eventsB); // TODO could also consider an in-place merge eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents, @@ -772,6 +792,7 @@ void asynStreamGeneratorDriver::processEvents() { eventsBLastStart = eventsA + toProcess; + // TODO I haven't really taken care of the case that there are no events if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) { @@ -800,15 +821,34 @@ void asynStreamGeneratorDriver::processEvents() { driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp); } - if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp) - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n", - driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); + //if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp) + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n", + // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); - if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n", - driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); + if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) { + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n", + // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); + + + + for (size_t j = 1; j < toProcess; ++j) { + if (eventsA[j-1].timestamp > eventsA[j].timestamp) { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: not sorted %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 "\n", + driverName, functionName, b_cnt, j, eventsA[j-1].timestamp, eventsA[j].timestamp); + } + } + ++b_cnt; + + std::sort(eventsA, eventsA + toProcess, oldestEventsFirst); + } + + // if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n", + // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp); if (currStatus == STATUS_COUNTING) {