less_logic_in_udp_thread #1

Merged
wall_e merged 35 commits from less_logic_in_udp_thread into main 2025-11-14 14:11:18 +01:00
2 changed files with 29 additions and 6 deletions
Showing only changes of commit 056b0a5f8a - Show all commits

View File

@@ -28,10 +28,11 @@ data = [
start_time = time.time_ns() // 100
buffer_ids = {
i: (0, 0) for i in range(10)
}
while True:
# update buffer number
header[6] = (header[6] + 1) % 0xff
header[7] = (header[7] + (header[6] == 0)) % 0xff
# update timestamp
base_timestamp = time.time_ns() // 100 - start_time
@@ -60,9 +61,16 @@ while True:
# reduce also the number of checks on the parsing side of things...
is_monitor = random.randint(0, 9)
# is_monitor = 4
header[11] = 0 if is_monitor > 3 else random.randint(1,9)
# update buffer number (each mcpdid has its own buffer number count)
header[6], header[7] = buffer_ids[header[11]]
header[6] = (header[6] + 1) % (0xff + 1)
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
buffer_ids[header[11]] = header[6], header[7]
tosend = list(header)
if is_monitor > 3:
@@ -71,6 +79,7 @@ while True:
d = list(data)
monitor = random.randint(0,3)
# monitor = 0
d[5] = (1 << 7) | monitor
@@ -106,4 +115,4 @@ while True:
sock.sendto(bytes(tosend), ('127.0.0.1', 54321))
mv = memoryview(bytes(header)).cast('H')
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
# time.sleep(1)
# time.sleep(.01)

View File

@@ -208,7 +208,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
/* Create the thread that orders the events and acts as our sinqDaq stand-in
*/
status = (asynStatus)(epicsThreadCreate(
"sinqDAQ", epicsThreadPriorityMedium,
"sinqDAQ", epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::daqTask, this) == NULL);
if (status) {
@@ -362,7 +362,7 @@ void asynStreamGeneratorDriver::receiveUDP() {
char buffer[bufferSize];
// We have 10 mcpdids
uint64_t lastBufferNumber* = new uint64_t[10];
uint64_t *lastBufferNumber = new uint64_t[10];
for (size_t i = 0; i < 10; ++i) {
lastBufferNumber[i] = 0;
}
@@ -388,6 +388,20 @@ void asynStreamGeneratorDriver::receiveUDP() {
if (received == total_events * 6 + 42) {
if (header->BufferNumber - lastBufferNumber[header->McpdID] >
1 &&
lastBufferNumber[header->McpdID] !=
std::numeric_limits<
decltype(header->BufferNumber)>::max()) {
asynPrint(
pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: missed packet on id: %d. Received: %" PRIu64
", last: %" PRIu64 "\n",
driverName, functionName, header->McpdID,
header->BufferNumber, lastBufferNumber[header->McpdID]);
}
lastBufferNumber[header->McpdID] = header->BufferNumber;
for (std::size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);