From c530de3566d9232c4b730e289a0bb25cb4eb2644 Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Fri, 7 Nov 2025 16:14:05 +0100 Subject: [PATCH] does removing all logic in the udp receive thread help to improve the packet receive frequency? --- src/asynStreamGeneratorDriver.cpp | 186 +++++++++++++++++++----------- src/asynStreamGeneratorDriver.h | 2 + 2 files changed, 121 insertions(+), 67 deletions(-) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 208584e..11bb120 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -65,6 +65,11 @@ static void udpPollerTask(void *drvPvt) { pSGD->receiveUDP(); } +static void udpNormaliserTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->normaliseUDP(); +} + static void sortTask(void *drvPvt) { asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; pSGD->partialSortEvents(); @@ -131,6 +136,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( // measured in max packet sizes udpQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), + normalisedQueue( + epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), // TODO configurable sizes sortedQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), @@ -267,6 +274,20 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( exit(1); } + /* Create the thread normalises the events + */ + status = + (asynStatus)(epicsThreadCreate( + "eventNormaliser", + epicsThreadPriorityMedium, // epicsThreadPriorityMax, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } + // UDP Receive Setup status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); @@ -278,7 +299,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( /* Create the thread that receives UDP traffic in the background */ status = (asynStatus)(epicsThreadCreate( - "udp_receive", epicsThreadPriorityMedium, + "udp_receive", epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); if (status) { @@ -424,6 +445,45 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, void asynStreamGeneratorDriver::receiveUDP() { + const char *functionName = "receiveUDP"; + asynStatus status = asynSuccess; + int isConnected = 1; + std::size_t received; + int eomReason; + + const std::size_t bufferSize = 1500; + char buffer[bufferSize]; + + while (true) { + + // status = pasynManager->isConnected(pasynUDPUser, &isConnected); + + // if (!isConnected) + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: isConnected = %d\n", driverName, functionName, + // isConnected); + + status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, + 0, // timeout + &received, &eomReason); + + if (received) { + + if ((received - 42) % 6 == 0) { + + epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize); + + } else { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: invalid UDP packet\n", driverName, + functionName); + } + } + } +} + +void asynStreamGeneratorDriver::normaliseUDP() { + // TODO fix time overflows // Regarding time overflow. // * the header time stamp is 3 words, i.e. 48 bits. @@ -433,7 +493,7 @@ void asynStreamGeneratorDriver::receiveUDP() { // * so maybe this isn't necessary to solve, as long as we restart the // electronics at least once a year... - const char *functionName = "receiveUDP"; + const char *functionName = "normaliseUDP"; asynStatus status = asynSuccess; int isConnected = 1; std::size_t received; @@ -445,8 +505,11 @@ void asynStreamGeneratorDriver::receiveUDP() { const std::size_t bufferSize = 1500; char buffer[bufferSize]; + const std::size_t resultBufferSize = 243; + NormalisedEvent resultBuffer[resultBufferSize]; + // We have 10 mcpdids - uint64_t *lastBufferNumber = new uint64_t[10]; + uint64_t lastBufferNumber[10]; for (size_t i = 0; i < 10; ++i) { lastBufferNumber[i] = 0; } @@ -457,69 +520,56 @@ void asynStreamGeneratorDriver::receiveUDP() { while (true) { - status = pasynManager->isConnected(pasynUDPUser, &isConnected); + if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) { - if (!isConnected) - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: isConnected = %d\n", driverName, functionName, - isConnected); - - status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, - 0, // timeout - &received, &eomReason); - - if (received) { + epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize); UDPHeader *header = (UDPHeader *)buffer; std::size_t total_events = (header->BufferLength - 21) / 3; - if (received == total_events * 6 + 42) { - - if (header->BufferNumber - lastBufferNumber[header->McpdID] > - 1 && - lastBufferNumber[header->McpdID] != - std::numeric_limits< - decltype(header->BufferNumber)>::max()) { - asynPrint( - pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: missed packet on id: %d. Received: %" PRIu64 - ", last: %" PRIu64 "\n", - driverName, functionName, header->McpdID, - header->BufferNumber, lastBufferNumber[header->McpdID]); - setIntegerParam(P_UdpDropped, ++droppedMessages); - } - lastBufferNumber[header->McpdID] = header->BufferNumber; - - for (std::size_t i = 0; i < total_events; ++i) { - char *event = (buffer + 21 * 2 + i * 6); - - if (event[5] & 0x80) { // Monitor Event - MonitorEvent *m_event = (MonitorEvent *)event; - - ne.timestamp = - header->nanosecs() + (uint64_t)m_event->nanosecs(); - ne.source = 0; - ne.pixelId = m_event->DataID; - - } else { // Detector Event - DetectorEvent *d_event = (DetectorEvent *)event; - - ne.timestamp = - header->nanosecs() + (uint64_t)d_event->nanosecs(); - ne.source = header->McpdID; - ne.pixelId = d_event->pixelId(header->McpdID); - } - - epicsRingBytesPut(this->udpQueue, (char *)&ne, - sizeof(NormalisedEvent)); - } - - } else { + if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 && + lastBufferNumber[header->McpdID] != + std::numeric_limits< + decltype(header->BufferNumber)>::max()) { asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: invalid UDP packet\n", driverName, - functionName); + "%s:%s: missed packet on id: %d. Received: %" PRIu64 + ", last: %" PRIu64 "\n", + driverName, functionName, header->McpdID, + header->BufferNumber, + lastBufferNumber[header->McpdID]); + setIntegerParam(P_UdpDropped, ++droppedMessages); } + lastBufferNumber[header->McpdID] = header->BufferNumber; + + for (std::size_t i = 0; i < total_events; ++i) { + char *event = (buffer + 21 * 2 + i * 6); + + if (event[5] & 0x80) { // Monitor Event + MonitorEvent *m_event = (MonitorEvent *)event; + + ne.timestamp = + header->nanosecs() + (uint64_t)m_event->nanosecs(); + ne.source = 0; + ne.pixelId = m_event->DataID; + + } else { // Detector Event + DetectorEvent *d_event = (DetectorEvent *)event; + + ne.timestamp = + header->nanosecs() + (uint64_t)d_event->nanosecs(); + ne.source = header->McpdID; + ne.pixelId = d_event->pixelId(header->McpdID); + } + + resultBuffer[i] = ne; + } + + epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer, + total_events * sizeof(NormalisedEvent)); + + } else { + epicsThreadSleep(0.0001); // seconds } } } @@ -540,7 +590,7 @@ void asynStreamGeneratorDriver::partialSortEvents() { // x * number of ids * max events in packet int bufferedEvents = 5 * 10 * 243; - NormalisedEvent *events = new NormalisedEvent[bufferedEvents]; + NormalisedEvent events[bufferedEvents]; int queuedEvents = 0; epicsTimeStamp lastSort = epicsTime::getCurrent(); @@ -548,7 +598,8 @@ void asynStreamGeneratorDriver::partialSortEvents() { while (true) { - queuedEvents = eventsInQueue(this->udpQueue); // in case we can't wait + queuedEvents = + eventsInQueue(this->normalisedQueue); // in case we can't wait lastSort = epicsTime::getCurrent(); currentTime = lastSort; @@ -558,13 +609,13 @@ void asynStreamGeneratorDriver::partialSortEvents() { epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); - queuedEvents = eventsInQueue(this->udpQueue); + queuedEvents = eventsInQueue(this->normalisedQueue); } queuedEvents = std::min(queuedEvents, bufferedEvents); if (queuedEvents) { - epicsRingBytesGet(this->udpQueue, (char *)events, + epicsRingBytesGet(this->normalisedQueue, (char *)events, queuedEvents * sizeof(NormalisedEvent)); std::sort(events, events + queuedEvents, oldestEventsFirst); @@ -597,10 +648,11 @@ void asynStreamGeneratorDriver::processEvents() { // we have two buffers. We alternate between reading data into one of them, // and then merge sorting into the other - NormalisedEvent *eventsA = - new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; - NormalisedEvent *eventsB = - new NormalisedEvent[(bufferedEvents + extraBufferedEvents)]; + NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)]; + NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)]; + + NormalisedEvent *eventsA = &eventsABuffer[0]; + NormalisedEvent *eventsB = &eventsBBuffer[0]; NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents; NormalisedEvent *eventsBLastEnd = eventsBLastStart; @@ -609,7 +661,7 @@ void asynStreamGeneratorDriver::processEvents() { epicsTimeStamp lastProcess = epicsTime::getCurrent(); epicsTimeStamp currentTime = lastProcess; - epicsInt32 *counts = new epicsInt32[this->num_channels]; + epicsInt32 counts[this->num_channels]; double elapsedSeconds = 0; uint64_t startTimestamp = std::numeric_limits::max(); uint64_t currTimestamp; diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index ac8ce5e..1517697 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -118,6 +118,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value); void receiveUDP(); + void normaliseUDP(); void partialSortEvents(); void processEvents(); void produceMonitor(); @@ -155,6 +156,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { const int udpQueueSize; epicsRingBytesId udpQueue; + epicsRingBytesId normalisedQueue; epicsRingBytesId sortedQueue; epicsRingBytesId monitorQueue;