diff --git a/scripts/udp_gen.py b/scripts/udp_gen.py index bb62832..3f01887 100644 --- a/scripts/udp_gen.py +++ b/scripts/udp_gen.py @@ -28,10 +28,11 @@ data = [ start_time = time.time_ns() // 100 +buffer_ids = { + i: (0, 0) for i in range(10) +} + while True: - # update buffer number - header[6] = (header[6] + 1) % 0xff - header[7] = (header[7] + (header[6] == 0)) % 0xff # update timestamp base_timestamp = time.time_ns() // 100 - start_time @@ -60,9 +61,16 @@ while True: # reduce also the number of checks on the parsing side of things... is_monitor = random.randint(0, 9) + # is_monitor = 4 header[11] = 0 if is_monitor > 3 else random.randint(1,9) + # update buffer number (each mcpdid has its own buffer number count) + header[6], header[7] = buffer_ids[header[11]] + header[6] = (header[6] + 1) % (0xff + 1) + header[7] = (header[7] + (header[6] == 0)) % (0xff + 1) + buffer_ids[header[11]] = header[6], header[7] + tosend = list(header) if is_monitor > 3: @@ -71,6 +79,7 @@ while True: d = list(data) monitor = random.randint(0,3) + # monitor = 0 d[5] = (1 << 7) | monitor @@ -106,4 +115,4 @@ while True: sock.sendto(bytes(tosend), ('127.0.0.1', 54321)) mv = memoryview(bytes(header)).cast('H') print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}') - # time.sleep(1) + # time.sleep(.01) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index b7c9880..dc2b3d6 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -208,7 +208,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( /* Create the thread that orders the events and acts as our sinqDaq stand-in */ status = (asynStatus)(epicsThreadCreate( - "sinqDAQ", epicsThreadPriorityMedium, + "sinqDAQ", epicsThreadPriorityMax, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC)::daqTask, this) == NULL); if (status) { @@ -362,7 +362,7 @@ void asynStreamGeneratorDriver::receiveUDP() { char buffer[bufferSize]; // We have 10 mcpdids - uint64_t lastBufferNumber* = new uint64_t[10]; + uint64_t *lastBufferNumber = new uint64_t[10]; for (size_t i = 0; i < 10; ++i) { lastBufferNumber[i] = 0; } @@ -388,6 +388,20 @@ void asynStreamGeneratorDriver::receiveUDP() { if (received == total_events * 6 + 42) { + if (header->BufferNumber - lastBufferNumber[header->McpdID] > + 1 && + lastBufferNumber[header->McpdID] != + std::numeric_limits< + decltype(header->BufferNumber)>::max()) { + asynPrint( + pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: missed packet on id: %d. Received: %" PRIu64 + ", last: %" PRIu64 "\n", + driverName, functionName, header->McpdID, + header->BufferNumber, lastBufferNumber[header->McpdID]); + } + lastBufferNumber[header->McpdID] = header->BufferNumber; + for (std::size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6);