proof of concept

This commit is contained in:
2025-10-22 12:24:17 +02:00
commit 1d7a3dd42a
2 changed files with 226 additions and 0 deletions

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
confluent-kafka==2.12.1
ess-streaming-data-types==0.27.0
flatbuffers==25.9.23
numpy==1.26.3

222
udp_rate.py Normal file
View File

@@ -0,0 +1,222 @@
import queue
import socket
import time
import threading
from uuid import uuid4
from confluent_kafka import Producer
import streaming_data_types
# receiving directly (can also specify correlation unit ip)
UDP_IP = ""
UDP_PORT = 54321
# If redirecting traffic via
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
# UDP_IP = "127.0.0.1"
# UDP_PORT = 54323
WINDOWSECONDS = 5
WINDOWSIZE = 20000 * WINDOWSECONDS
MONITORS = 4 # We have max 4 monitors
time_offset = None # Estimate of clock offset
time_window = {
i: queue.Queue(maxsize=WINDOWSIZE)
for i in range(MONITORS)
}
def print_monitor_rates():
while True:
for i in range(MONITORS):
msg = f"Monitor {i} : {time_window[i].qsize() / WINDOWSECONDS} cts/s"
try:
earliest = time_window[i].queue[0]
newest = max(time_window[i].queue)
t = time.time()
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
except:
pass
print(msg)
time.sleep(1)
threading.Thread(target=print_monitor_rates, daemon=True).start()
def clean_monitor_rates():
latest = 0
while True:
for d_id in range(MONITORS):
t_w = time_window[d_id]
if not t_w.empty():
# TODO probably should switch to a priority queue
# TODO the way this is done, we need trigger events
# in order for the signal to decay back to 0.
# If no events come, the rate remains stuck
latest = max(latest, max(t_w.queue))
# latest = time_window[1].queue[-1]
try:
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
t_w.get_nowait()
except IndexError:
pass
time.sleep(0.005)
threading.Thread(target=clean_monitor_rates, daemon=True).start()
# Event Kafka Producer
event_queue = queue.Queue()
def event_producer():
producer_config = {
'bootstrap.servers': "linkafka01:9092",
}
prod = Producer(producer_config)
st = time.time()
msg_id = 0
b_size = 1000
b_ptr = 0
pixel_buffer = [0 for _ in range(b_size)]
time_buffer = [0 for _ in range(b_size)]
while True:
(p_id, timestamp) = event_queue.get()
pixel_buffer[b_ptr] = p_id
time_buffer[b_ptr] = timestamp
b_ptr += 1
nt = time.time()
if b_ptr == b_size or nt - st > 0.1:
st = nt
if b_ptr > 0:
message = streaming_data_types.serialise_ev42(
message_id = msg_id,
pulse_time = int(time.time() * 1_000_000_000),
time_of_flight = time_buffer[0:b_ptr],
detector_id = pixel_buffer[0:b_ptr],
source_name = '',
)
msg_id = (msg_id + 1) % 100000000
b_ptr = 0
prod.produce(
topic = "DMC_detector",
value = message,
partition = 0,
)
threading.Thread(target=event_producer, daemon=True).start()
# Monitor Kafka Producer
monitor_queue = queue.Queue()
def monitor_producer():
producer_config = {
'bootstrap.servers': "linkafka01:9092",
}
prod = Producer(producer_config)
monitor_buffer = [0 for i in range(MONITORS)]
monitor_time = [0 for i in range(MONITORS)]
st = time.time()
while True:
(d_id, timestamp) = monitor_queue.get()
monitor_buffer[d_id] += 1
monitor_time[d_id] = timestamp
nt = time.time()
if nt - st > 0.1:
st = nt
for i in range(MONITORS):
if monitor_buffer[d_id]:
message = streaming_data_types.serialise_f142(
source_name = f"monitor{d_id}",
value = monitor_buffer[d_id],
# ns resolution (supposed to be past epoch, not what the detector returns though)
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
)
prod.produce(
topic = "DMC_neutron_monitor",
value = message,
partition = 0,
)
monitor_buffer[d_id] = 0
threading.Thread(target=monitor_producer, daemon=True).start()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
val = 0
start_time = time.time()
module_counts = [0 for i in range(10)]
while True:
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
raw_header = data[:42]
raw_data = data[42:]
(buffer_length, buffer_type, header_length,
buffer_number, run_id, mcpd_status,
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
mcpd_id = ( mcpd_status >> 8 ) & 0xff
mcpd_status = ( mcpd_status ) & 0x3
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
if time_offset is None:
time_offset = time.time() * 1e7 - timestamp
if buffer_number - module_counts[mcpd_id] != 1:
print("Missed Packet!!!")
module_counts[mcpd_id] = buffer_number
for i in range(0, len(raw_data), 6):
event = memoryview(raw_data)[i:i+6]
event_type = event[5] >> 7
# print(event_type)
if event_type: # Trigger Event
t_id = ( event[5] >> 4 ) & 0x7
d_id = event[5] & 0xf
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
t_w = time_window[d_id]
t_w.put_nowait(event_timestamp)
monitor_queue.put_nowait((d_id, event_timestamp))
else: # Neutron Event
x_pixels = 128
y_pixels = 128
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
x = ( event[3] << 5 | event[2] >> 3 ) & 0x3ff
y = ( event[4] << 3 | event[3] >> 5 ) & 0x3ff
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
event_queue.put_nowait((
(mcpd_id-1) * x_pixels * y_pixels + x_pixels * x + y,
event_timestamp
))