From 1d7a3dd42a7016b70ab6ddaab58733a63df92d3f Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Wed, 22 Oct 2025 12:24:17 +0200 Subject: [PATCH] proof of concept --- requirements.txt | 4 + udp_rate.py | 222 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 requirements.txt create mode 100644 udp_rate.py diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9dfa0e4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +confluent-kafka==2.12.1 +ess-streaming-data-types==0.27.0 +flatbuffers==25.9.23 +numpy==1.26.3 diff --git a/udp_rate.py b/udp_rate.py new file mode 100644 index 0000000..1084c7e --- /dev/null +++ b/udp_rate.py @@ -0,0 +1,222 @@ +import queue +import socket +import time +import threading +from uuid import uuid4 + +from confluent_kafka import Producer +import streaming_data_types + +# receiving directly (can also specify correlation unit ip) +UDP_IP = "" +UDP_PORT = 54321 + +# If redirecting traffic via +# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323 +# UDP_IP = "127.0.0.1" +# UDP_PORT = 54323 + +WINDOWSECONDS = 5 +WINDOWSIZE = 20000 * WINDOWSECONDS +MONITORS = 4 # We have max 4 monitors + +time_offset = None # Estimate of clock offset + +time_window = { + i: queue.Queue(maxsize=WINDOWSIZE) + for i in range(MONITORS) +} + +def print_monitor_rates(): + while True: + for i in range(MONITORS): + msg = f"Monitor {i} : {time_window[i].qsize() / WINDOWSECONDS} cts/s" + try: + earliest = time_window[i].queue[0] + newest = max(time_window[i].queue) + t = time.time() + msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' + except: + pass + + print(msg) + time.sleep(1) + +threading.Thread(target=print_monitor_rates, daemon=True).start() + +def clean_monitor_rates(): + latest = 0 + while True: + for d_id in range(MONITORS): + t_w = time_window[d_id] + if not t_w.empty(): + # TODO probably should switch to a priority queue + # TODO the way this is done, we need trigger events + # in order for the signal to decay back to 0. + # If no events come, the rate remains stuck + latest = max(latest, max(t_w.queue)) + # latest = time_window[1].queue[-1] + try: + while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): + t_w.get_nowait() + except IndexError: + pass + time.sleep(0.005) + +threading.Thread(target=clean_monitor_rates, daemon=True).start() + +# Event Kafka Producer + +event_queue = queue.Queue() + +def event_producer(): + producer_config = { + 'bootstrap.servers': "linkafka01:9092", + } + prod = Producer(producer_config) + + st = time.time() + + msg_id = 0 + + b_size = 1000 + b_ptr = 0 + pixel_buffer = [0 for _ in range(b_size)] + time_buffer = [0 for _ in range(b_size)] + + while True: + (p_id, timestamp) = event_queue.get() + + pixel_buffer[b_ptr] = p_id + time_buffer[b_ptr] = timestamp + b_ptr += 1 + + nt = time.time() + if b_ptr == b_size or nt - st > 0.1: + st = nt + + if b_ptr > 0: + message = streaming_data_types.serialise_ev42( + message_id = msg_id, + pulse_time = int(time.time() * 1_000_000_000), + time_of_flight = time_buffer[0:b_ptr], + detector_id = pixel_buffer[0:b_ptr], + source_name = '', + ) + + msg_id = (msg_id + 1) % 100000000 + b_ptr = 0 + + prod.produce( + topic = "DMC_detector", + value = message, + partition = 0, + ) + +threading.Thread(target=event_producer, daemon=True).start() + +# Monitor Kafka Producer + +monitor_queue = queue.Queue() + +def monitor_producer(): + producer_config = { + 'bootstrap.servers': "linkafka01:9092", + } + prod = Producer(producer_config) + + monitor_buffer = [0 for i in range(MONITORS)] + monitor_time = [0 for i in range(MONITORS)] + + st = time.time() + + while True: + (d_id, timestamp) = monitor_queue.get() + + monitor_buffer[d_id] += 1 + monitor_time[d_id] = timestamp + + nt = time.time() + if nt - st > 0.1: + st = nt + + for i in range(MONITORS): + if monitor_buffer[d_id]: + message = streaming_data_types.serialise_f142( + source_name = f"monitor{d_id}", + value = monitor_buffer[d_id], + # ns resolution (supposed to be past epoch, not what the detector returns though) + timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor + ) + + prod.produce( + topic = "DMC_neutron_monitor", + value = message, + partition = 0, + ) + + monitor_buffer[d_id] = 0 + +threading.Thread(target=monitor_producer, daemon=True).start() + + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +sock.bind((UDP_IP, UDP_PORT)) + +val = 0 +start_time = time.time() + +module_counts = [0 for i in range(10)] + +while True: + data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes + raw_header = data[:42] + raw_data = data[42:] + + (buffer_length, buffer_type, header_length, + buffer_number, run_id, mcpd_status, + t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H') + mcpd_id = ( mcpd_status >> 8 ) & 0xff + mcpd_status = ( mcpd_status ) & 0x3 + running_msg = "running" if (mcpd_status & 0x1) else "stopped" + sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error" + timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution + #print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}') + # print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}') + + if time_offset is None: + time_offset = time.time() * 1e7 - timestamp + + if buffer_number - module_counts[mcpd_id] != 1: + print("Missed Packet!!!") + module_counts[mcpd_id] = buffer_number + + for i in range(0, len(raw_data), 6): + event = memoryview(raw_data)[i:i+6] + event_type = event[5] >> 7 + # print(event_type) + + if event_type: # Trigger Event + t_id = ( event[5] >> 4 ) & 0x7 + d_id = event[5] & 0xf + event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] + # print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}') + + t_w = time_window[d_id] + t_w.put_nowait(event_timestamp) + + monitor_queue.put_nowait((d_id, event_timestamp)) + + else: # Neutron Event + x_pixels = 128 + y_pixels = 128 + amplitude = ( event[5] << 1 ) | ( event[4] >> 7 ) + x = ( event[3] << 5 | event[2] >> 3 ) & 0x3ff + y = ( event[4] << 3 | event[3] >> 5 ) & 0x3ff + event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] + # print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}') + + event_queue.put_nowait(( + (mcpd_id-1) * x_pixels * y_pixels + x_pixels * x + y, + event_timestamp + ))