diff --git a/udp_rate.py b/udp_rate.py index 1084c7e..8bee264 100644 --- a/udp_rate.py +++ b/udp_rate.py @@ -3,6 +3,7 @@ import socket import time import threading from uuid import uuid4 +import math from confluent_kafka import Producer import streaming_data_types @@ -27,10 +28,22 @@ time_window = { for i in range(MONITORS) } +# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS) +EVENT_WINDOWSIZE = 50000 +EVENT_WINDOW_PTR = 0 +event_time_window = [0 for i in range(EVENT_WINDOWSIZE)] + +event_average_rate = 0 +event_last_timestamp = None + +MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0 + +# missed_packets_time_window = queue.Queue(maxsize=100) + def print_monitor_rates(): while True: for i in range(MONITORS): - msg = f"Monitor {i} : {time_window[i].qsize() / WINDOWSECONDS} cts/s" + msg = f"Monitor {i}: {time_window[i].qsize() / WINDOWSECONDS} cts/s" try: earliest = time_window[i].queue[0] newest = max(time_window[i].queue) @@ -40,6 +53,31 @@ def print_monitor_rates(): pass print(msg) + + # try: + # print(f'Events: {1 / event_average_rate} cts/s') + # except: + # pass + + try: + print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s') + except: + pass + + print(f'Missed Packets: {MISSED_PACKETS}') + + # Detector Events + # msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s" + # try: + # earliest = event_time_window.queue[0] + # newest = max(event_time_window.queue) + # t = time.time() + # msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' + # except: + # pass + + # print(msg) + time.sleep(1) threading.Thread(target=print_monitor_rates, daemon=True).start() @@ -51,6 +89,9 @@ def clean_monitor_rates(): t_w = time_window[d_id] if not t_w.empty(): # TODO probably should switch to a priority queue + # as the messages might not be in order + # TODO could also just replace with a low-pass filter + # would be a lot more efficient # TODO the way this is done, we need trigger events # in order for the signal to decay back to 0. # If no events come, the rate remains stuck @@ -61,10 +102,37 @@ def clean_monitor_rates(): t_w.get_nowait() except IndexError: pass - time.sleep(0.005) + time.sleep(0.01) threading.Thread(target=clean_monitor_rates, daemon=True).start() + +# def clean_event_rates(): +# latest = 0 +# while True: +# t_w = event_time_window +# if not t_w.empty(): +# # TODO probably should switch to a priority queue +# # as the messages might not be in order +# # TODO could also just replace with a low-pass filter +# # would be a lot more efficient +# # TODO the way this is done, we need trigger events +# # in order for the signal to decay back to 0. +# # If no events come, the rate remains stuck +# #latest = max(latest, max(t_w.queue)) +# try: +# latest = time_window[1].queue[-1] +# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): +# t_w.get_nowait() +# except IndexError: +# pass +# time.sleep(0.005) +# +# threading.Thread(target=clean_event_rates, daemon=True).start() + + + + # Event Kafka Producer event_queue = queue.Queue() @@ -72,6 +140,7 @@ event_queue = queue.Queue() def event_producer(): producer_config = { 'bootstrap.servers': "linkafka01:9092", + 'queue.buffering.max.messages': 1e7, } prod = Producer(producer_config) @@ -79,10 +148,11 @@ def event_producer(): msg_id = 0 - b_size = 1000 + b_size = 10000 b_ptr = 0 pixel_buffer = [0 for _ in range(b_size)] time_buffer = [0 for _ in range(b_size)] + poll_cnt = 0 while True: (p_id, timestamp) = event_queue.get() @@ -98,7 +168,7 @@ def event_producer(): if b_ptr > 0: message = streaming_data_types.serialise_ev42( message_id = msg_id, - pulse_time = int(time.time() * 1_000_000_000), + pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000), time_of_flight = time_buffer[0:b_ptr], detector_id = pixel_buffer[0:b_ptr], source_name = '', @@ -113,6 +183,10 @@ def event_producer(): partition = 0, ) + if poll_cnt % 1000 == 0: + prod.poll(0) + poll_cnt = (poll_cnt + 1) % 1000 + threading.Thread(target=event_producer, daemon=True).start() # Monitor Kafka Producer @@ -122,6 +196,7 @@ monitor_queue = queue.Queue() def monitor_producer(): producer_config = { 'bootstrap.servers': "linkafka01:9092", + 'queue.buffering.max.messages': 1e7, } prod = Producer(producer_config) @@ -130,6 +205,8 @@ def monitor_producer(): st = time.time() + poll_cnt = 0 + while True: (d_id, timestamp) = monitor_queue.get() @@ -157,6 +234,10 @@ def monitor_producer(): monitor_buffer[d_id] = 0 + if poll_cnt % 1000 == 0: + prod.poll(0) + poll_cnt = (poll_cnt + 1) % 1000 + threading.Thread(target=monitor_producer, daemon=True).start() @@ -188,7 +269,11 @@ while True: time_offset = time.time() * 1e7 - timestamp if buffer_number - module_counts[mcpd_id] != 1: - print("Missed Packet!!!") + MISSED_PACKETS += 1 + # if missed_packets_time_window.full(): + # missed_packets_time_window.get_nowait() + # missed_packets_time_window.put(timestamp) + module_counts[mcpd_id] = buffer_number for i in range(0, len(raw_data), 6): @@ -216,6 +301,29 @@ while True: event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] # print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}') + + if event_last_timestamp is None: + event_last_timestamp = event_timestamp + + # Seems like at higher frequencies these come very much out of order + # so this is very approximate + event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp + EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE + event_last_timestamp = event_timestamp + + # I suppose this doesn't work mostly due to the timestamps ordering... + # event_timestamp_seconds = event_timestamp * 1e-7 + # if event_last_timestamp is None: + # event_last_timestamp = event_timestamp_seconds + + # f_cutoff = 1e6 # Hz + # tau = 1 / ( 2 * math.pi * f_cutoff) + # dt = event_timestamp_seconds - event_last_timestamp + # if dt > 0: + # w = math.exp(-dt / tau) + # event_average_rate = w * dt + event_average_rate * (1 - w) + # event_last_timestamp = event_timestamp_seconds + event_queue.put_nowait(( (mcpd_id-1) * x_pixels * y_pixels + x_pixels * x + y, event_timestamp