import queue import socket import time import threading from uuid import uuid4 import math from confluent_kafka import Producer import streaming_data_types # receiving directly (can also specify correlation unit ip) UDP_IP = "" UDP_PORT = 54321 # If redirecting traffic via # socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323 # UDP_IP = "127.0.0.1" # UDP_PORT = 54323 WINDOWSECONDS = 5 WINDOWSIZE = 20000 * WINDOWSECONDS MONITORS = 4 # We have max 4 monitors time_offset = None # Estimate of clock offset time_window = { i: queue.Queue(maxsize=WINDOWSIZE) for i in range(MONITORS) } # event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS) EVENT_WINDOWSIZE = 50000 EVENT_WINDOW_PTR = 0 event_time_window = [0 for i in range(EVENT_WINDOWSIZE)] event_average_rate = 0 event_last_timestamp = None MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0 # missed_packets_time_window = queue.Queue(maxsize=100) def print_monitor_rates(): while True: for i in range(MONITORS): msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s" try: earliest = time_window[i].queue[0] newest = max(time_window[i].queue) t = time.time() msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' except: pass print(msg) # try: # print(f'Events: {1 / event_average_rate} cts/s') # except: # pass try: print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s') except: pass print(f'Missed Packets: {MISSED_PACKETS}') # Detector Events # msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s" # try: # earliest = event_time_window.queue[0] # newest = max(event_time_window.queue) # t = time.time() # msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' # except: # pass # print(msg) time.sleep(1) threading.Thread(target=print_monitor_rates, daemon=True).start() def clean_monitor_rates(): latest = 0 while True: for d_id in range(MONITORS): t_w = time_window[d_id] if not t_w.empty(): # TODO probably should switch to a priority queue # as the messages might not be in order # TODO could also just replace with a low-pass filter # would be a lot more efficient # TODO the way this is done, we need trigger events # in order for the signal to decay back to 0. # If no events come, the rate remains stuck latest = max(latest, max(t_w.queue)) # latest = time_window[1].queue[-1] try: while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): t_w.get_nowait() except IndexError: pass time.sleep(0.01) threading.Thread(target=clean_monitor_rates, daemon=True).start() # def clean_event_rates(): # latest = 0 # while True: # t_w = event_time_window # if not t_w.empty(): # # TODO probably should switch to a priority queue # # as the messages might not be in order # # TODO could also just replace with a low-pass filter # # would be a lot more efficient # # TODO the way this is done, we need trigger events # # in order for the signal to decay back to 0. # # If no events come, the rate remains stuck # #latest = max(latest, max(t_w.queue)) # try: # latest = time_window[1].queue[-1] # while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): # t_w.get_nowait() # except IndexError: # pass # time.sleep(0.005) # # threading.Thread(target=clean_event_rates, daemon=True).start() # Event Kafka Producer event_queue = queue.Queue() def event_producer(): producer_config = { 'bootstrap.servers': "linkafka01:9092", 'queue.buffering.max.messages': 1e7, } prod = Producer(producer_config) st = time.time() msg_id = 0 b_size = 10000 b_ptr = 0 pixel_buffer = [0 for _ in range(b_size)] time_buffer = [0 for _ in range(b_size)] poll_cnt = 0 while True: (p_id, timestamp) = event_queue.get() pixel_buffer[b_ptr] = p_id time_buffer[b_ptr] = timestamp b_ptr += 1 nt = time.time() if b_ptr == b_size or nt - st > 0.001: st = nt if b_ptr > 0: message = streaming_data_types.serialise_ev42( message_id = msg_id, pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000), time_of_flight = time_buffer[0:b_ptr], detector_id = pixel_buffer[0:b_ptr], source_name = '', ) msg_id = (msg_id + 1) % 100000000 b_ptr = 0 prod.produce( topic = "DMC_detector", value = message, partition = 0, ) # if poll_cnt % 1000 == 0: prod.poll(0) poll_cnt = (poll_cnt + 1) % 1000 threading.Thread(target=event_producer, daemon=True).start() # Monitor Kafka Producer monitor_queue = queue.Queue() def monitor_producer(): producer_config = { 'bootstrap.servers': "linkafka01:9092", 'queue.buffering.max.messages': 1e7, } prod = Producer(producer_config) monitor_buffer = [0 for i in range(MONITORS)] monitor_time = [0 for i in range(MONITORS)] st = time.time() poll_cnt = 0 while True: (d_id, timestamp) = monitor_queue.get() monitor_buffer[d_id] += 1 monitor_time[d_id] = timestamp nt = time.time() if nt - st > 0.05: st = nt for i in range(MONITORS): if monitor_buffer[d_id]: message = streaming_data_types.serialise_f142( source_name = f"monitor{d_id+1}", value = monitor_buffer[d_id], # ns resolution (supposed to be past epoch, not what the detector returns though) timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor ) prod.produce( topic = "DMC_neutron_monitor", value = message, partition = 0, ) monitor_buffer[d_id] = 0 if poll_cnt % 1000 == 0: prod.poll(0) poll_cnt = (poll_cnt + 1) % 1000 threading.Thread(target=monitor_producer, daemon=True).start() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((UDP_IP, UDP_PORT)) val = 0 start_time = time.time() module_counts = [0 for i in range(10)] EVENTS = 0 while True: data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes raw_header = data[:42] raw_data = data[42:] (buffer_length, buffer_type, header_length, buffer_number, run_id, mcpd_status, t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H') mcpd_id = ( mcpd_status >> 8 ) & 0xff mcpd_status = ( mcpd_status ) & 0x3 running_msg = "running" if (mcpd_status & 0x1) else "stopped" sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error" timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution #print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}') # print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}') if time_offset is None: time_offset = time.time() * 1e7 - timestamp if buffer_number - module_counts[mcpd_id] != 1: MISSED_PACKETS += 1 # if missed_packets_time_window.full(): # missed_packets_time_window.get_nowait() # missed_packets_time_window.put(timestamp) module_counts[mcpd_id] = buffer_number for i in range(0, len(raw_data), 6): event = memoryview(raw_data)[i:i+6] event_type = event[5] >> 7 # print(event_type) if event_type: # Trigger Event t_id = ( event[5] >> 4 ) & 0x7 d_id = event[5] & 0xf event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] # print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}') t_w = time_window[d_id] t_w.put_nowait(event_timestamp) monitor_queue.put_nowait((d_id, event_timestamp)) else: # Neutron Event x_pixels = 128 y_pixels = 128 amplitude = ( event[5] << 1 ) | ( event[4] >> 7 ) # The DMC StreamHistogrammer setup currently expects each module to # be 128 * 128 pixels but the resolution in the packages is # actually 10bit. We remove the lowest 3 bits. x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3 y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3 event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] # print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}') if event_last_timestamp is None: event_last_timestamp = event_timestamp # Seems like at higher frequencies these come very much out of order # so this is very approximate event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE event_last_timestamp = event_timestamp # I suppose this doesn't work mostly due to the timestamps ordering... # event_timestamp_seconds = event_timestamp * 1e-7 # if event_last_timestamp is None: # event_last_timestamp = event_timestamp_seconds # f_cutoff = 1e6 # Hz # tau = 1 / ( 2 * math.pi * f_cutoff) # dt = event_timestamp_seconds - event_last_timestamp # if dt > 0: # w = math.exp(-dt / tau) # event_average_rate = w * dt + event_average_rate * (1 - w) # event_last_timestamp = event_timestamp_seconds # EVENTS += 1 # a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y # print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id)) # if not a < 128 * 128 * 9: # print((event[3], event[3] << 5, event[2], event[2] >> 3)) event_queue.put_nowait(( (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y, event_timestamp ))