343 lines
11 KiB
Python
343 lines
11 KiB
Python
import queue
|
|
import socket
|
|
import time
|
|
import threading
|
|
from uuid import uuid4
|
|
import math
|
|
|
|
from confluent_kafka import Producer
|
|
import streaming_data_types
|
|
|
|
# receiving directly (can also specify correlation unit ip)
|
|
UDP_IP = ""
|
|
UDP_PORT = 54321
|
|
|
|
# If redirecting traffic via
|
|
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
|
|
# UDP_IP = "127.0.0.1"
|
|
# UDP_PORT = 54323
|
|
|
|
WINDOWSECONDS = 5
|
|
WINDOWSIZE = 20000 * WINDOWSECONDS
|
|
MONITORS = 4 # We have max 4 monitors
|
|
|
|
time_offset = None # Estimate of clock offset
|
|
|
|
time_window = {
|
|
i: queue.Queue(maxsize=WINDOWSIZE)
|
|
for i in range(MONITORS)
|
|
}
|
|
|
|
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
|
|
EVENT_WINDOWSIZE = 50000
|
|
EVENT_WINDOW_PTR = 0
|
|
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
|
|
|
|
event_average_rate = 0
|
|
event_last_timestamp = None
|
|
|
|
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
|
|
|
|
# missed_packets_time_window = queue.Queue(maxsize=100)
|
|
|
|
def print_monitor_rates():
|
|
while True:
|
|
for i in range(MONITORS):
|
|
msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
|
|
try:
|
|
earliest = time_window[i].queue[0]
|
|
newest = max(time_window[i].queue)
|
|
t = time.time()
|
|
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
|
except:
|
|
pass
|
|
|
|
print(msg)
|
|
|
|
# try:
|
|
# print(f'Events: {1 / event_average_rate} cts/s')
|
|
# except:
|
|
# pass
|
|
|
|
try:
|
|
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
|
|
except:
|
|
pass
|
|
|
|
print(f'Missed Packets: {MISSED_PACKETS}')
|
|
|
|
# Detector Events
|
|
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
|
|
# try:
|
|
# earliest = event_time_window.queue[0]
|
|
# newest = max(event_time_window.queue)
|
|
# t = time.time()
|
|
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
|
# except:
|
|
# pass
|
|
|
|
# print(msg)
|
|
|
|
time.sleep(1)
|
|
|
|
threading.Thread(target=print_monitor_rates, daemon=True).start()
|
|
|
|
def clean_monitor_rates():
|
|
latest = 0
|
|
while True:
|
|
for d_id in range(MONITORS):
|
|
t_w = time_window[d_id]
|
|
if not t_w.empty():
|
|
# TODO probably should switch to a priority queue
|
|
# as the messages might not be in order
|
|
# TODO could also just replace with a low-pass filter
|
|
# would be a lot more efficient
|
|
# TODO the way this is done, we need trigger events
|
|
# in order for the signal to decay back to 0.
|
|
# If no events come, the rate remains stuck
|
|
latest = max(latest, max(t_w.queue))
|
|
# latest = time_window[1].queue[-1]
|
|
try:
|
|
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
|
t_w.get_nowait()
|
|
except IndexError:
|
|
pass
|
|
time.sleep(0.01)
|
|
|
|
threading.Thread(target=clean_monitor_rates, daemon=True).start()
|
|
|
|
|
|
# def clean_event_rates():
|
|
# latest = 0
|
|
# while True:
|
|
# t_w = event_time_window
|
|
# if not t_w.empty():
|
|
# # TODO probably should switch to a priority queue
|
|
# # as the messages might not be in order
|
|
# # TODO could also just replace with a low-pass filter
|
|
# # would be a lot more efficient
|
|
# # TODO the way this is done, we need trigger events
|
|
# # in order for the signal to decay back to 0.
|
|
# # If no events come, the rate remains stuck
|
|
# #latest = max(latest, max(t_w.queue))
|
|
# try:
|
|
# latest = time_window[1].queue[-1]
|
|
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
|
# t_w.get_nowait()
|
|
# except IndexError:
|
|
# pass
|
|
# time.sleep(0.005)
|
|
#
|
|
# threading.Thread(target=clean_event_rates, daemon=True).start()
|
|
|
|
|
|
|
|
|
|
# Event Kafka Producer
|
|
|
|
event_queue = queue.Queue()
|
|
|
|
def event_producer():
|
|
producer_config = {
|
|
'bootstrap.servers': "linkafka01:9092",
|
|
'queue.buffering.max.messages': 1e7,
|
|
}
|
|
prod = Producer(producer_config)
|
|
|
|
st = time.time()
|
|
|
|
msg_id = 0
|
|
|
|
b_size = 10000
|
|
b_ptr = 0
|
|
pixel_buffer = [0 for _ in range(b_size)]
|
|
time_buffer = [0 for _ in range(b_size)]
|
|
poll_cnt = 0
|
|
|
|
while True:
|
|
(p_id, timestamp) = event_queue.get()
|
|
|
|
pixel_buffer[b_ptr] = p_id
|
|
time_buffer[b_ptr] = timestamp
|
|
b_ptr += 1
|
|
|
|
nt = time.time()
|
|
if b_ptr == b_size or nt - st > 0.001:
|
|
st = nt
|
|
|
|
if b_ptr > 0:
|
|
message = streaming_data_types.serialise_ev42(
|
|
message_id = msg_id,
|
|
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
|
|
time_of_flight = time_buffer[0:b_ptr],
|
|
detector_id = pixel_buffer[0:b_ptr],
|
|
source_name = '',
|
|
)
|
|
|
|
msg_id = (msg_id + 1) % 100000000
|
|
b_ptr = 0
|
|
|
|
prod.produce(
|
|
topic = "DMC_detector",
|
|
value = message,
|
|
partition = 0,
|
|
)
|
|
|
|
# if poll_cnt % 1000 == 0:
|
|
prod.poll(0)
|
|
poll_cnt = (poll_cnt + 1) % 1000
|
|
|
|
threading.Thread(target=event_producer, daemon=True).start()
|
|
|
|
# Monitor Kafka Producer
|
|
|
|
monitor_queue = queue.Queue()
|
|
|
|
def monitor_producer():
|
|
producer_config = {
|
|
'bootstrap.servers': "linkafka01:9092",
|
|
'queue.buffering.max.messages': 1e7,
|
|
}
|
|
prod = Producer(producer_config)
|
|
|
|
monitor_buffer = [0 for i in range(MONITORS)]
|
|
monitor_time = [0 for i in range(MONITORS)]
|
|
|
|
st = time.time()
|
|
|
|
poll_cnt = 0
|
|
|
|
while True:
|
|
(d_id, timestamp) = monitor_queue.get()
|
|
|
|
monitor_buffer[d_id] += 1
|
|
monitor_time[d_id] = timestamp
|
|
|
|
nt = time.time()
|
|
if nt - st > 0.05:
|
|
st = nt
|
|
|
|
for i in range(MONITORS):
|
|
if monitor_buffer[d_id]:
|
|
message = streaming_data_types.serialise_f142(
|
|
source_name = f"monitor{d_id+1}",
|
|
value = monitor_buffer[d_id],
|
|
# ns resolution (supposed to be past epoch, not what the detector returns though)
|
|
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
|
|
)
|
|
|
|
prod.produce(
|
|
topic = "DMC_neutron_monitor",
|
|
value = message,
|
|
partition = 0,
|
|
)
|
|
|
|
monitor_buffer[d_id] = 0
|
|
|
|
if poll_cnt % 1000 == 0:
|
|
prod.poll(0)
|
|
poll_cnt = (poll_cnt + 1) % 1000
|
|
|
|
threading.Thread(target=monitor_producer, daemon=True).start()
|
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.bind((UDP_IP, UDP_PORT))
|
|
|
|
val = 0
|
|
start_time = time.time()
|
|
|
|
module_counts = [0 for i in range(10)]
|
|
|
|
EVENTS = 0
|
|
|
|
while True:
|
|
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
|
|
raw_header = data[:42]
|
|
raw_data = data[42:]
|
|
|
|
(buffer_length, buffer_type, header_length,
|
|
buffer_number, run_id, mcpd_status,
|
|
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
|
|
mcpd_id = ( mcpd_status >> 8 ) & 0xff
|
|
mcpd_status = ( mcpd_status ) & 0x3
|
|
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
|
|
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
|
|
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
|
|
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
|
|
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
|
|
|
|
if time_offset is None:
|
|
time_offset = time.time() * 1e7 - timestamp
|
|
|
|
if buffer_number - module_counts[mcpd_id] != 1:
|
|
MISSED_PACKETS += 1
|
|
# if missed_packets_time_window.full():
|
|
# missed_packets_time_window.get_nowait()
|
|
# missed_packets_time_window.put(timestamp)
|
|
|
|
module_counts[mcpd_id] = buffer_number
|
|
|
|
for i in range(0, len(raw_data), 6):
|
|
event = memoryview(raw_data)[i:i+6]
|
|
event_type = event[5] >> 7
|
|
# print(event_type)
|
|
|
|
if event_type: # Trigger Event
|
|
t_id = ( event[5] >> 4 ) & 0x7
|
|
d_id = event[5] & 0xf
|
|
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
|
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
|
|
|
|
t_w = time_window[d_id]
|
|
t_w.put_nowait(event_timestamp)
|
|
|
|
monitor_queue.put_nowait((d_id, event_timestamp))
|
|
|
|
else: # Neutron Event
|
|
x_pixels = 128
|
|
y_pixels = 128
|
|
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
|
|
|
|
# The DMC StreamHistogrammer setup currently expects each module to
|
|
# be 128 * 128 pixels but the resolution in the packages is
|
|
# actually 10bit. We remove the lowest 3 bits.
|
|
x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3
|
|
y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3
|
|
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
|
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
|
|
|
|
|
|
if event_last_timestamp is None:
|
|
event_last_timestamp = event_timestamp
|
|
|
|
# Seems like at higher frequencies these come very much out of order
|
|
# so this is very approximate
|
|
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
|
|
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
|
|
event_last_timestamp = event_timestamp
|
|
|
|
# I suppose this doesn't work mostly due to the timestamps ordering...
|
|
# event_timestamp_seconds = event_timestamp * 1e-7
|
|
# if event_last_timestamp is None:
|
|
# event_last_timestamp = event_timestamp_seconds
|
|
|
|
# f_cutoff = 1e6 # Hz
|
|
# tau = 1 / ( 2 * math.pi * f_cutoff)
|
|
# dt = event_timestamp_seconds - event_last_timestamp
|
|
# if dt > 0:
|
|
# w = math.exp(-dt / tau)
|
|
# event_average_rate = w * dt + event_average_rate * (1 - w)
|
|
# event_last_timestamp = event_timestamp_seconds
|
|
|
|
# EVENTS += 1
|
|
# a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y
|
|
# print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id))
|
|
# if not a < 128 * 128 * 9:
|
|
# print((event[3], event[3] << 5, event[2], event[2] >> 3))
|
|
|
|
event_queue.put_nowait((
|
|
(mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y,
|
|
event_timestamp
|
|
))
|