diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 9dfa0e4..0000000 --- a/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -confluent-kafka==2.12.1 -ess-streaming-data-types==0.27.0 -flatbuffers==25.9.23 -numpy==1.26.3 diff --git a/udp_rate.py b/udp_rate.py deleted file mode 100644 index 9e6471f..0000000 --- a/udp_rate.py +++ /dev/null @@ -1,342 +0,0 @@ -import queue -import socket -import time -import threading -from uuid import uuid4 -import math - -from confluent_kafka import Producer -import streaming_data_types - -# receiving directly (can also specify correlation unit ip) -UDP_IP = "" -UDP_PORT = 54321 - -# If redirecting traffic via -# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323 -# UDP_IP = "127.0.0.1" -# UDP_PORT = 54323 - -WINDOWSECONDS = 5 -WINDOWSIZE = 20000 * WINDOWSECONDS -MONITORS = 4 # We have max 4 monitors - -time_offset = None # Estimate of clock offset - -time_window = { - i: queue.Queue(maxsize=WINDOWSIZE) - for i in range(MONITORS) -} - -# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS) -EVENT_WINDOWSIZE = 50000 -EVENT_WINDOW_PTR = 0 -event_time_window = [0 for i in range(EVENT_WINDOWSIZE)] - -event_average_rate = 0 -event_last_timestamp = None - -MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0 - -# missed_packets_time_window = queue.Queue(maxsize=100) - -def print_monitor_rates(): - while True: - for i in range(MONITORS): - msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s" - try: - earliest = time_window[i].queue[0] - newest = max(time_window[i].queue) - t = time.time() - msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' - except: - pass - - print(msg) - - # try: - # print(f'Events: {1 / event_average_rate} cts/s') - # except: - # pass - - try: - print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s') - except: - pass - - print(f'Missed Packets: {MISSED_PACKETS}') - - # Detector Events - # msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s" - # try: - # earliest = event_time_window.queue[0] - # newest = max(event_time_window.queue) - # t = time.time() - # msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' - # except: - # pass - - # print(msg) - - time.sleep(1) - -threading.Thread(target=print_monitor_rates, daemon=True).start() - -def clean_monitor_rates(): - latest = 0 - while True: - for d_id in range(MONITORS): - t_w = time_window[d_id] - if not t_w.empty(): - # TODO probably should switch to a priority queue - # as the messages might not be in order - # TODO could also just replace with a low-pass filter - # would be a lot more efficient - # TODO the way this is done, we need trigger events - # in order for the signal to decay back to 0. - # If no events come, the rate remains stuck - latest = max(latest, max(t_w.queue)) - # latest = time_window[1].queue[-1] - try: - while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): - t_w.get_nowait() - except IndexError: - pass - time.sleep(0.01) - -threading.Thread(target=clean_monitor_rates, daemon=True).start() - - -# def clean_event_rates(): -# latest = 0 -# while True: -# t_w = event_time_window -# if not t_w.empty(): -# # TODO probably should switch to a priority queue -# # as the messages might not be in order -# # TODO could also just replace with a low-pass filter -# # would be a lot more efficient -# # TODO the way this is done, we need trigger events -# # in order for the signal to decay back to 0. -# # If no events come, the rate remains stuck -# #latest = max(latest, max(t_w.queue)) -# try: -# latest = time_window[1].queue[-1] -# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): -# t_w.get_nowait() -# except IndexError: -# pass -# time.sleep(0.005) -# -# threading.Thread(target=clean_event_rates, daemon=True).start() - - - - -# Event Kafka Producer - -event_queue = queue.Queue() - -def event_producer(): - producer_config = { - 'bootstrap.servers': "linkafka01:9092", - 'queue.buffering.max.messages': 1e7, - } - prod = Producer(producer_config) - - st = time.time() - - msg_id = 0 - - b_size = 10000 - b_ptr = 0 - pixel_buffer = [0 for _ in range(b_size)] - time_buffer = [0 for _ in range(b_size)] - poll_cnt = 0 - - while True: - (p_id, timestamp) = event_queue.get() - - pixel_buffer[b_ptr] = p_id - time_buffer[b_ptr] = timestamp - b_ptr += 1 - - nt = time.time() - if b_ptr == b_size or nt - st > 0.001: - st = nt - - if b_ptr > 0: - message = streaming_data_types.serialise_ev42( - message_id = msg_id, - pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000), - time_of_flight = time_buffer[0:b_ptr], - detector_id = pixel_buffer[0:b_ptr], - source_name = '', - ) - - msg_id = (msg_id + 1) % 100000000 - b_ptr = 0 - - prod.produce( - topic = "DMC_detector", - value = message, - partition = 0, - ) - - # if poll_cnt % 1000 == 0: - prod.poll(0) - poll_cnt = (poll_cnt + 1) % 1000 - -threading.Thread(target=event_producer, daemon=True).start() - -# Monitor Kafka Producer - -monitor_queue = queue.Queue() - -def monitor_producer(): - producer_config = { - 'bootstrap.servers': "linkafka01:9092", - 'queue.buffering.max.messages': 1e7, - } - prod = Producer(producer_config) - - monitor_buffer = [0 for i in range(MONITORS)] - monitor_time = [0 for i in range(MONITORS)] - - st = time.time() - - poll_cnt = 0 - - while True: - (d_id, timestamp) = monitor_queue.get() - - monitor_buffer[d_id] += 1 - monitor_time[d_id] = timestamp - - nt = time.time() - if nt - st > 0.05: - st = nt - - for i in range(MONITORS): - if monitor_buffer[d_id]: - message = streaming_data_types.serialise_f142( - source_name = f"monitor{d_id+1}", - value = monitor_buffer[d_id], - # ns resolution (supposed to be past epoch, not what the detector returns though) - timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor - ) - - prod.produce( - topic = "DMC_neutron_monitor", - value = message, - partition = 0, - ) - - monitor_buffer[d_id] = 0 - - if poll_cnt % 1000 == 0: - prod.poll(0) - poll_cnt = (poll_cnt + 1) % 1000 - -threading.Thread(target=monitor_producer, daemon=True).start() - - -sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -sock.bind((UDP_IP, UDP_PORT)) - -val = 0 -start_time = time.time() - -module_counts = [0 for i in range(10)] - -EVENTS = 0 - -while True: - data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes - raw_header = data[:42] - raw_data = data[42:] - - (buffer_length, buffer_type, header_length, - buffer_number, run_id, mcpd_status, - t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H') - mcpd_id = ( mcpd_status >> 8 ) & 0xff - mcpd_status = ( mcpd_status ) & 0x3 - running_msg = "running" if (mcpd_status & 0x1) else "stopped" - sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error" - timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution - #print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}') - # print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}') - - if time_offset is None: - time_offset = time.time() * 1e7 - timestamp - - if buffer_number - module_counts[mcpd_id] != 1: - MISSED_PACKETS += 1 - # if missed_packets_time_window.full(): - # missed_packets_time_window.get_nowait() - # missed_packets_time_window.put(timestamp) - - module_counts[mcpd_id] = buffer_number - - for i in range(0, len(raw_data), 6): - event = memoryview(raw_data)[i:i+6] - event_type = event[5] >> 7 - # print(event_type) - - if event_type: # Trigger Event - t_id = ( event[5] >> 4 ) & 0x7 - d_id = event[5] & 0xf - event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] - # print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}') - - t_w = time_window[d_id] - t_w.put_nowait(event_timestamp) - - monitor_queue.put_nowait((d_id, event_timestamp)) - - else: # Neutron Event - x_pixels = 128 - y_pixels = 128 - amplitude = ( event[5] << 1 ) | ( event[4] >> 7 ) - - # The DMC StreamHistogrammer setup currently expects each module to - # be 128 * 128 pixels but the resolution in the packages is - # actually 10bit. We remove the lowest 3 bits. - x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3 - y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3 - event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] - # print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}') - - - if event_last_timestamp is None: - event_last_timestamp = event_timestamp - - # Seems like at higher frequencies these come very much out of order - # so this is very approximate - event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp - EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE - event_last_timestamp = event_timestamp - - # I suppose this doesn't work mostly due to the timestamps ordering... - # event_timestamp_seconds = event_timestamp * 1e-7 - # if event_last_timestamp is None: - # event_last_timestamp = event_timestamp_seconds - - # f_cutoff = 1e6 # Hz - # tau = 1 / ( 2 * math.pi * f_cutoff) - # dt = event_timestamp_seconds - event_last_timestamp - # if dt > 0: - # w = math.exp(-dt / tau) - # event_average_rate = w * dt + event_average_rate * (1 - w) - # event_last_timestamp = event_timestamp_seconds - - # EVENTS += 1 - # a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y - # print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id)) - # if not a < 128 * 128 * 9: - # print((event[3], event[3] << 5, event[2], event[2] >> 3)) - - event_queue.put_nowait(( - (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y, - event_timestamp - ))