some more rate calculation
This commit is contained in:
118
udp_rate.py
118
udp_rate.py
@@ -3,6 +3,7 @@ import socket
|
||||
import time
|
||||
import threading
|
||||
from uuid import uuid4
|
||||
import math
|
||||
|
||||
from confluent_kafka import Producer
|
||||
import streaming_data_types
|
||||
@@ -27,10 +28,22 @@ time_window = {
|
||||
for i in range(MONITORS)
|
||||
}
|
||||
|
||||
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
|
||||
EVENT_WINDOWSIZE = 50000
|
||||
EVENT_WINDOW_PTR = 0
|
||||
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
|
||||
|
||||
event_average_rate = 0
|
||||
event_last_timestamp = None
|
||||
|
||||
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
|
||||
|
||||
# missed_packets_time_window = queue.Queue(maxsize=100)
|
||||
|
||||
def print_monitor_rates():
|
||||
while True:
|
||||
for i in range(MONITORS):
|
||||
msg = f"Monitor {i} : {time_window[i].qsize() / WINDOWSECONDS} cts/s"
|
||||
msg = f"Monitor {i}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
|
||||
try:
|
||||
earliest = time_window[i].queue[0]
|
||||
newest = max(time_window[i].queue)
|
||||
@@ -40,6 +53,31 @@ def print_monitor_rates():
|
||||
pass
|
||||
|
||||
print(msg)
|
||||
|
||||
# try:
|
||||
# print(f'Events: {1 / event_average_rate} cts/s')
|
||||
# except:
|
||||
# pass
|
||||
|
||||
try:
|
||||
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
|
||||
except:
|
||||
pass
|
||||
|
||||
print(f'Missed Packets: {MISSED_PACKETS}')
|
||||
|
||||
# Detector Events
|
||||
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
|
||||
# try:
|
||||
# earliest = event_time_window.queue[0]
|
||||
# newest = max(event_time_window.queue)
|
||||
# t = time.time()
|
||||
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
||||
# except:
|
||||
# pass
|
||||
|
||||
# print(msg)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
threading.Thread(target=print_monitor_rates, daemon=True).start()
|
||||
@@ -51,6 +89,9 @@ def clean_monitor_rates():
|
||||
t_w = time_window[d_id]
|
||||
if not t_w.empty():
|
||||
# TODO probably should switch to a priority queue
|
||||
# as the messages might not be in order
|
||||
# TODO could also just replace with a low-pass filter
|
||||
# would be a lot more efficient
|
||||
# TODO the way this is done, we need trigger events
|
||||
# in order for the signal to decay back to 0.
|
||||
# If no events come, the rate remains stuck
|
||||
@@ -61,10 +102,37 @@ def clean_monitor_rates():
|
||||
t_w.get_nowait()
|
||||
except IndexError:
|
||||
pass
|
||||
time.sleep(0.005)
|
||||
time.sleep(0.01)
|
||||
|
||||
threading.Thread(target=clean_monitor_rates, daemon=True).start()
|
||||
|
||||
|
||||
# def clean_event_rates():
|
||||
# latest = 0
|
||||
# while True:
|
||||
# t_w = event_time_window
|
||||
# if not t_w.empty():
|
||||
# # TODO probably should switch to a priority queue
|
||||
# # as the messages might not be in order
|
||||
# # TODO could also just replace with a low-pass filter
|
||||
# # would be a lot more efficient
|
||||
# # TODO the way this is done, we need trigger events
|
||||
# # in order for the signal to decay back to 0.
|
||||
# # If no events come, the rate remains stuck
|
||||
# #latest = max(latest, max(t_w.queue))
|
||||
# try:
|
||||
# latest = time_window[1].queue[-1]
|
||||
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
||||
# t_w.get_nowait()
|
||||
# except IndexError:
|
||||
# pass
|
||||
# time.sleep(0.005)
|
||||
#
|
||||
# threading.Thread(target=clean_event_rates, daemon=True).start()
|
||||
|
||||
|
||||
|
||||
|
||||
# Event Kafka Producer
|
||||
|
||||
event_queue = queue.Queue()
|
||||
@@ -72,6 +140,7 @@ event_queue = queue.Queue()
|
||||
def event_producer():
|
||||
producer_config = {
|
||||
'bootstrap.servers': "linkafka01:9092",
|
||||
'queue.buffering.max.messages': 1e7,
|
||||
}
|
||||
prod = Producer(producer_config)
|
||||
|
||||
@@ -79,10 +148,11 @@ def event_producer():
|
||||
|
||||
msg_id = 0
|
||||
|
||||
b_size = 1000
|
||||
b_size = 10000
|
||||
b_ptr = 0
|
||||
pixel_buffer = [0 for _ in range(b_size)]
|
||||
time_buffer = [0 for _ in range(b_size)]
|
||||
poll_cnt = 0
|
||||
|
||||
while True:
|
||||
(p_id, timestamp) = event_queue.get()
|
||||
@@ -98,7 +168,7 @@ def event_producer():
|
||||
if b_ptr > 0:
|
||||
message = streaming_data_types.serialise_ev42(
|
||||
message_id = msg_id,
|
||||
pulse_time = int(time.time() * 1_000_000_000),
|
||||
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
|
||||
time_of_flight = time_buffer[0:b_ptr],
|
||||
detector_id = pixel_buffer[0:b_ptr],
|
||||
source_name = '',
|
||||
@@ -113,6 +183,10 @@ def event_producer():
|
||||
partition = 0,
|
||||
)
|
||||
|
||||
if poll_cnt % 1000 == 0:
|
||||
prod.poll(0)
|
||||
poll_cnt = (poll_cnt + 1) % 1000
|
||||
|
||||
threading.Thread(target=event_producer, daemon=True).start()
|
||||
|
||||
# Monitor Kafka Producer
|
||||
@@ -122,6 +196,7 @@ monitor_queue = queue.Queue()
|
||||
def monitor_producer():
|
||||
producer_config = {
|
||||
'bootstrap.servers': "linkafka01:9092",
|
||||
'queue.buffering.max.messages': 1e7,
|
||||
}
|
||||
prod = Producer(producer_config)
|
||||
|
||||
@@ -130,6 +205,8 @@ def monitor_producer():
|
||||
|
||||
st = time.time()
|
||||
|
||||
poll_cnt = 0
|
||||
|
||||
while True:
|
||||
(d_id, timestamp) = monitor_queue.get()
|
||||
|
||||
@@ -157,6 +234,10 @@ def monitor_producer():
|
||||
|
||||
monitor_buffer[d_id] = 0
|
||||
|
||||
if poll_cnt % 1000 == 0:
|
||||
prod.poll(0)
|
||||
poll_cnt = (poll_cnt + 1) % 1000
|
||||
|
||||
threading.Thread(target=monitor_producer, daemon=True).start()
|
||||
|
||||
|
||||
@@ -188,7 +269,11 @@ while True:
|
||||
time_offset = time.time() * 1e7 - timestamp
|
||||
|
||||
if buffer_number - module_counts[mcpd_id] != 1:
|
||||
print("Missed Packet!!!")
|
||||
MISSED_PACKETS += 1
|
||||
# if missed_packets_time_window.full():
|
||||
# missed_packets_time_window.get_nowait()
|
||||
# missed_packets_time_window.put(timestamp)
|
||||
|
||||
module_counts[mcpd_id] = buffer_number
|
||||
|
||||
for i in range(0, len(raw_data), 6):
|
||||
@@ -216,6 +301,29 @@ while True:
|
||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
||||
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
|
||||
|
||||
|
||||
if event_last_timestamp is None:
|
||||
event_last_timestamp = event_timestamp
|
||||
|
||||
# Seems like at higher frequencies these come very much out of order
|
||||
# so this is very approximate
|
||||
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
|
||||
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
|
||||
event_last_timestamp = event_timestamp
|
||||
|
||||
# I suppose this doesn't work mostly due to the timestamps ordering...
|
||||
# event_timestamp_seconds = event_timestamp * 1e-7
|
||||
# if event_last_timestamp is None:
|
||||
# event_last_timestamp = event_timestamp_seconds
|
||||
|
||||
# f_cutoff = 1e6 # Hz
|
||||
# tau = 1 / ( 2 * math.pi * f_cutoff)
|
||||
# dt = event_timestamp_seconds - event_last_timestamp
|
||||
# if dt > 0:
|
||||
# w = math.exp(-dt / tau)
|
||||
# event_average_rate = w * dt + event_average_rate * (1 - w)
|
||||
# event_last_timestamp = event_timestamp_seconds
|
||||
|
||||
event_queue.put_nowait((
|
||||
(mcpd_id-1) * x_pixels * y_pixels + x_pixels * x + y,
|
||||
event_timestamp
|
||||
|
||||
Reference in New Issue
Block a user