does removing all logic in the udp receive thread help to improve the packet receive frequency?

This commit is contained in:
2025-11-07 16:14:05 +01:00
parent ba07a8af9b
commit c530de3566
2 changed files with 121 additions and 67 deletions

View File

@@ -65,6 +65,11 @@ static void udpPollerTask(void *drvPvt) {
pSGD->receiveUDP();
}
static void udpNormaliserTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->normaliseUDP();
}
static void sortTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->partialSortEvents();
@@ -131,6 +136,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
// measured in max packet sizes
udpQueue(
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
normalisedQueue(
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
// TODO configurable sizes
sortedQueue(
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
@@ -267,6 +274,20 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
exit(1);
}
/* Create the thread normalises the events
*/
status =
(asynStatus)(epicsThreadCreate(
"eventNormaliser",
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
// UDP Receive Setup
status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
@@ -278,7 +299,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
/* Create the thread that receives UDP traffic in the background */
status = (asynStatus)(epicsThreadCreate(
"udp_receive", epicsThreadPriorityMedium,
"udp_receive", epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
if (status) {
@@ -424,6 +445,45 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
void asynStreamGeneratorDriver::receiveUDP() {
const char *functionName = "receiveUDP";
asynStatus status = asynSuccess;
int isConnected = 1;
std::size_t received;
int eomReason;
const std::size_t bufferSize = 1500;
char buffer[bufferSize];
while (true) {
// status = pasynManager->isConnected(pasynUDPUser, &isConnected);
// if (!isConnected)
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: isConnected = %d\n", driverName, functionName,
// isConnected);
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
0, // timeout
&received, &eomReason);
if (received) {
if ((received - 42) % 6 == 0) {
epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize);
} else {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: invalid UDP packet\n", driverName,
functionName);
}
}
}
}
void asynStreamGeneratorDriver::normaliseUDP() {
// TODO fix time overflows
// Regarding time overflow.
// * the header time stamp is 3 words, i.e. 48 bits.
@@ -433,7 +493,7 @@ void asynStreamGeneratorDriver::receiveUDP() {
// * so maybe this isn't necessary to solve, as long as we restart the
// electronics at least once a year...
const char *functionName = "receiveUDP";
const char *functionName = "normaliseUDP";
asynStatus status = asynSuccess;
int isConnected = 1;
std::size_t received;
@@ -445,8 +505,11 @@ void asynStreamGeneratorDriver::receiveUDP() {
const std::size_t bufferSize = 1500;
char buffer[bufferSize];
const std::size_t resultBufferSize = 243;
NormalisedEvent resultBuffer[resultBufferSize];
// We have 10 mcpdids
uint64_t *lastBufferNumber = new uint64_t[10];
uint64_t lastBufferNumber[10];
for (size_t i = 0; i < 10; ++i) {
lastBufferNumber[i] = 0;
}
@@ -457,69 +520,56 @@ void asynStreamGeneratorDriver::receiveUDP() {
while (true) {
status = pasynManager->isConnected(pasynUDPUser, &isConnected);
if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) {
if (!isConnected)
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: isConnected = %d\n", driverName, functionName,
isConnected);
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
0, // timeout
&received, &eomReason);
if (received) {
epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize);
UDPHeader *header = (UDPHeader *)buffer;
std::size_t total_events = (header->BufferLength - 21) / 3;
if (received == total_events * 6 + 42) {
if (header->BufferNumber - lastBufferNumber[header->McpdID] >
1 &&
lastBufferNumber[header->McpdID] !=
std::numeric_limits<
decltype(header->BufferNumber)>::max()) {
asynPrint(
pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: missed packet on id: %d. Received: %" PRIu64
", last: %" PRIu64 "\n",
driverName, functionName, header->McpdID,
header->BufferNumber, lastBufferNumber[header->McpdID]);
setIntegerParam(P_UdpDropped, ++droppedMessages);
}
lastBufferNumber[header->McpdID] = header->BufferNumber;
for (std::size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
if (event[5] & 0x80) { // Monitor Event
MonitorEvent *m_event = (MonitorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
ne.source = 0;
ne.pixelId = m_event->DataID;
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
ne.source = header->McpdID;
ne.pixelId = d_event->pixelId(header->McpdID);
}
epicsRingBytesPut(this->udpQueue, (char *)&ne,
sizeof(NormalisedEvent));
}
} else {
if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 &&
lastBufferNumber[header->McpdID] !=
std::numeric_limits<
decltype(header->BufferNumber)>::max()) {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: invalid UDP packet\n", driverName,
functionName);
"%s:%s: missed packet on id: %d. Received: %" PRIu64
", last: %" PRIu64 "\n",
driverName, functionName, header->McpdID,
header->BufferNumber,
lastBufferNumber[header->McpdID]);
setIntegerParam(P_UdpDropped, ++droppedMessages);
}
lastBufferNumber[header->McpdID] = header->BufferNumber;
for (std::size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
if (event[5] & 0x80) { // Monitor Event
MonitorEvent *m_event = (MonitorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
ne.source = 0;
ne.pixelId = m_event->DataID;
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
ne.source = header->McpdID;
ne.pixelId = d_event->pixelId(header->McpdID);
}
resultBuffer[i] = ne;
}
epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
total_events * sizeof(NormalisedEvent));
} else {
epicsThreadSleep(0.0001); // seconds
}
}
}
@@ -540,7 +590,7 @@ void asynStreamGeneratorDriver::partialSortEvents() {
// x * number of ids * max events in packet
int bufferedEvents = 5 * 10 * 243;
NormalisedEvent *events = new NormalisedEvent[bufferedEvents];
NormalisedEvent events[bufferedEvents];
int queuedEvents = 0;
epicsTimeStamp lastSort = epicsTime::getCurrent();
@@ -548,7 +598,8 @@ void asynStreamGeneratorDriver::partialSortEvents() {
while (true) {
queuedEvents = eventsInQueue(this->udpQueue); // in case we can't wait
queuedEvents =
eventsInQueue(this->normalisedQueue); // in case we can't wait
lastSort = epicsTime::getCurrent();
currentTime = lastSort;
@@ -558,13 +609,13 @@ void asynStreamGeneratorDriver::partialSortEvents() {
epicsTimeDiffInNS(&currentTime, &lastSort) < 250'000'000ull) {
epicsThreadSleep(0.0001); // seconds
currentTime = epicsTime::getCurrent();
queuedEvents = eventsInQueue(this->udpQueue);
queuedEvents = eventsInQueue(this->normalisedQueue);
}
queuedEvents = std::min(queuedEvents, bufferedEvents);
if (queuedEvents) {
epicsRingBytesGet(this->udpQueue, (char *)events,
epicsRingBytesGet(this->normalisedQueue, (char *)events,
queuedEvents * sizeof(NormalisedEvent));
std::sort(events, events + queuedEvents, oldestEventsFirst);
@@ -597,10 +648,11 @@ void asynStreamGeneratorDriver::processEvents() {
// we have two buffers. We alternate between reading data into one of them,
// and then merge sorting into the other
NormalisedEvent *eventsA =
new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
NormalisedEvent *eventsB =
new NormalisedEvent[(bufferedEvents + extraBufferedEvents)];
NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)];
NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)];
NormalisedEvent *eventsA = &eventsABuffer[0];
NormalisedEvent *eventsB = &eventsBBuffer[0];
NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents;
NormalisedEvent *eventsBLastEnd = eventsBLastStart;
@@ -609,7 +661,7 @@ void asynStreamGeneratorDriver::processEvents() {
epicsTimeStamp lastProcess = epicsTime::getCurrent();
epicsTimeStamp currentTime = lastProcess;
epicsInt32 *counts = new epicsInt32[this->num_channels];
epicsInt32 counts[this->num_channels];
double elapsedSeconds = 0;
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
uint64_t currTimestamp;