Implement live Kafke event processing with Nicos start/stop

This commit is contained in:
2025-10-23 15:40:50 +02:00
parent c429429dd2
commit d722228079
4 changed files with 263 additions and 53 deletions
+2 -2
View File
@@ -41,7 +41,7 @@ class AmorHeader:
def __init__(self, fileName:Union[str, h5py.File, BinaryIO]):
if type(fileName) is str:
logging.warning(f' {fileName.split('/')[-1]}')
logging.warning(f' {fileName.split("/")[-1]}')
self.hdf = h5py.File(fileName, 'r', swmr=True)
elif type(fileName) is h5py.File:
self.hdf = fileName
@@ -223,7 +223,7 @@ class AmorEventData(AmorHeader):
def __init__(self, fileName:Union[str, h5py.File, BinaryIO], first_index:int=0, max_events:int=100_000_000):
if type(fileName) is str:
logging.warning(f' {fileName.split('/')[-1]}')
logging.warning(f' {fileName.split("/")[-1]}')
self.file_list = [fileName]
hdf = h5py.File(fileName, 'r', swmr=True)
elif type(fileName) is h5py.File:
+165
View File
@@ -0,0 +1,165 @@
"""
Collect AMOR detector events send via Kafka.
"""
import logging
import numpy as np
from threading import Thread, Event
from time import time
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE
from uuid import uuid4
from streaming_data_types.eventdata_ev44 import EventData
from streaming_data_types.logdata_f144 import ExtractedLogData
from streaming_data_types import deserialise_f144, deserialise_ev44
from confluent_kafka import Consumer
from .header import Header
try:
from streaming_data_types.utils import get_schema
except ImportError:
from streaming_data_types.utils import _get_schema as get_schema
KAFKA_BROKER = 'linkafka01.psi.ch:9092'
AMOR_EVENTS = 'amor_ev44'
AMOR_NICOS = 'AMOR_nicosForwarder'
class KafkaFrozenData:
"""
Represents event stream data from Kafka at a given time.
Will be returned by KafkaEventData to be use in conjunction
with data processing and projections.
Implements EventDatasetProtocol
"""
geometry: AmorGeometry
timing: AmorTiming
data: AmorEventStream
def __init__(self, geometry, timing, data, monitor=1.):
self.geometry = geometry
self.timing = timing
self.data = data
self.monitor = monitor
def append(self, other):
raise NotImplementedError("can't append live datastream to other event data")
def update_header(self, header:Header):
# maybe makes sense later, but for now just used for live vizualization
...
class KafkaEventData(Thread):
"""
Read Nicos information and events from Kafka. Creates a background
thread that listens to Kafka events and converts them to eos compatible information.
"""
geometry: AmorGeometry
timing: AmorTiming
events: np.recarray
def __init__(self):
self.stop_event = Event()
self.stop_counting = Event()
self.new_events = Event()
self.last_read = 0
self.last_read_time = 0.
self.start_time = time()
self.consumer = Consumer(
{'bootstrap.servers': 'linkafka01.psi.ch:9092',
'group.id': uuid4()})
self.consumer.subscribe([AMOR_EVENTS, AMOR_NICOS])
self.geometry = AmorGeometry(1.0, 2.0, 0., 0., 1.5, 10.0, 4.0, 10.0)
self.timing = AmorTiming(0., 0., 500., 0., 30./500.)
# create empty dataset
self.events = np.recarray(0, dtype=EVENT_TYPE)
super().__init__()
def run(self):
while not self.stop_event.is_set():
messages = self.consumer.consume(10, timeout=1)
for message in messages:
self.process_message(message)
def process_message(self, message):
if message.error():
logging.info(f" received Kafka message with error: {message.error()}")
return
schema = get_schema(message.value())
if message.topic()==AMOR_EVENTS and schema=='ev44':
events:EventData = deserialise_ev44(message.value())
self.add_events(events)
self.new_events.set()
logging.debug(f' new events {events}')
elif message.topic()==AMOR_NICOS and schema=='f144':
nicos_data:ExtractedLogData = deserialise_f144(message.value())
if nicos_data.source_name in self.nicos_mapping.keys():
logging.debug(f' {nicos_data.source_name} = {nicos_data.value}')
self.update_instrument(nicos_data)
def add_events(self, events:EventData):
"""
Add new events to the Dataset. The object keeps raw events
and only copies the latest set to the self.data object,
this allows to run the event processing to be performed on a "clean"
evnet stream each time.
"""
if self.stop_counting.is_set():
return
prev_size = self.events.shape[0]
new_events = events.pixel_id.shape[0]
self.events.resize(prev_size+new_events, refcheck=False)
self.events.pixelID[prev_size:] = events.pixel_id
self.events.mask[prev_size:] = 0
self.events.tof[prev_size:] = events.time_of_flight
nicos_mapping = {
'mu': ('geometry', 'mu'),
'nu': ('geometry', 'nu'),
'kappa': ('geometry', 'kap'),
'kappa_offset': ('geometry', 'kad'),
'ch1_trigger_phase': ('timing', 'ch1TriggerPhase'),
'ch2_trigger_phase': ('timing', 'ch2TriggerPhase'),
'ch2_speed': ('timing', 'chopperSpeed'),
'chopper_phase': ('timing', 'chopperPhase'),
}
def update_instrument(self, nicos_data:ExtractedLogData):
if nicos_data.source_name in self.nicos_mapping:
attr, subattr = self.nicos_mapping[nicos_data.source_name]
setattr(getattr(self, attr), subattr, nicos_data.value)
if nicos_data.source_name=='ch2_speed':
self.timing.tau = 30./self.timing.chopperSpeed
def monitor(self):
return time()-self.start_time
def restart(self):
# empty event buffer
self.events = np.recarray(0, dtype=EVENT_TYPE)
self.stop_counting.clear()
self.last_read = 0
self.start_time = time()
self.new_events.clear()
def get_events(self, total_counts=False):
packets = np.recarray(0, dtype=PACKET_TYPE)
pulses = np.recarray(0, dtype=PULSE_TYPE)
pc = np.recarray(0, dtype=PC_TYPE)
if total_counts:
last_read = 0
else:
last_read = self.last_read
if last_read>=self.events.shape[0]:
raise EOFError("No new events arrived")
data = AmorEventStream(self.events[last_read:].copy(), packets, pulses, pc)
self.last_read = self.events.shape[0]
self.new_events.clear()
t_now = time()
monitor = t_now-self.last_read_time
self.last_read_time = t_now
return KafkaFrozenData(self.geometry, self.timing, data, monitor=monitor)
+70 -7
View File
@@ -29,13 +29,14 @@ hist = {
"""
import logging
from typing import List, Tuple, Union
from threading import Thread, Event
import numpy as np
import json
from time import time
from dataclasses import dataclass, asdict
from streaming_data_types import histogram_hs01
from confluent_kafka import Producer, Consumer, TopicPartition
from confluent_kafka import Producer, Consumer
from uuid import uuid4
@@ -134,6 +135,8 @@ class ESSSerializer:
self._active_histogram_tofz = None
self._last_message_yz = None
self._last_message_tofz = None
self.new_count_started = Event()
self.count_stopped = Event()
self.consumer.subscribe([KAFKA_TOPICS['command']])
@@ -147,7 +150,7 @@ class ESSSerializer:
except Exception:
logging.error(f'Could not interpret message: \n{command}', exc_info=True)
return
logging.warning(command)
logging.info(command)
resp = json.dumps({
"msg_id": getattr(command, "id", None) or command.msg_id,
"response": "ACK",
@@ -170,6 +173,7 @@ class ESSSerializer:
"num events": message.data.sum()
})
self._last_message_yz = None
self.count_stopped.set()
elif command.hist_id == self._active_histogram_tofz:
suffix = 'TofZ'
self._active_histogram_tofz = None
@@ -192,9 +196,12 @@ class ESSSerializer:
if hist.topic == KAFKA_TOPICS['histogram']+'_YZ':
self._active_histogram_yz = hist.id
logging.debug(f" histogram data_topic: {hist.data_topics}")
self._start = command.start
self.count_stopped.clear()
self.new_count_started.set()
self.set_empty_messages()
if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ':
self._active_histogram_tofz = hist.id
self._start = command.start
def receive(self, timeout=5):
rec = self.consumer.poll(timeout)
@@ -205,20 +212,19 @@ class ESSSerializer:
return False
def receive_loop(self):
while self._keep_receiving:
while not self._stop_receiving.is_set():
try:
self.receive()
except Exception:
logging.error("Exception while receiving", exc_info=True)
def start_command_thread(self):
from threading import Thread
self._keep_receiving = True
self._stop_receiving = Event()
self._command_thread = Thread(target=self.receive_loop)
self._command_thread.start()
def end_command_thread(self, event=None):
self._keep_receiving = False
self._stop_receiving.set()
self._command_thread.join()
def acked(self, err, msg):
@@ -261,6 +267,7 @@ class ESSSerializer:
})
)
self._last_message_yz = message
logging.info(f" Sending {proj.data.cts.sum()} events to Nicos")
elif isinstance(proj, TofZProjection):
if self._active_histogram_tofz is None:
return
@@ -300,3 +307,59 @@ class ESSSerializer:
topic=KAFKA_TOPICS['histogram']+'_'+suffix,
callback=self.acked)
self.producer.flush()
def set_empty_messages(self):
self._last_message_yz = HistogramMessage(
source='amor-eos',
timestamp=ktime(),
current_shape=(64, 448),
dim_metadata=(
DimMetadata(
length=64,
unit="pixel",
label="Y",
bin_boundaries=np.arange(64),
),
DimMetadata(
length=448,
unit="pixel",
label="Z",
bin_boundaries=np.arange(448),
)
),
last_metadata_timestamp=0,
data=np.zeros((64, 448)),
errors=np.zeros((64, 448)),
info=json.dumps({
"start": self._start,
"state": 'COUNTING',
"num events": 0
})
)
self._last_message_tofz = message = HistogramMessage(
source='amor-eos',
timestamp=ktime(),
current_shape=(56, 448),
dim_metadata=(
DimMetadata(
length=56,
unit="ms",
label="ToF",
bin_boundaries=np.arange(56),
),
DimMetadata(
length=448,
unit="pixel",
label="Z",
bin_boundaries=np.arange(448),
),
),
last_metadata_timestamp=0,
data=np.zeros((56, 448)),
errors=np.zeros((56, 448)),
info=json.dumps({
"start": self._start,
"state": 'COUNTING',
"num events": 0
})
)
+26 -44
View File
@@ -7,11 +7,10 @@ import logging
import os
from time import sleep
from .file_reader import AmorEventData, AmorHeader
from .kafka_events import KafkaEventData
from .header import Header
from .options import E2HConfig
from . import event_handling as eh, event_analysis as ea
from .path_handling import PathResolver
from .projection import TofZProjection, YZProjection
from .kafka_serializer import ESSSerializer
@@ -29,6 +28,8 @@ class KafkaReduction:
self.config = config
self.header = Header()
self.event_data = KafkaEventData()
self.event_data.start()
self.prepare_actions()
@@ -36,9 +37,6 @@ class KafkaReduction:
"""
Does not do any actual reduction.
"""
self.path_resolver = PathResolver(self.config.reader.year, self.config.reader.rawPath)
self.current_file = self.path_resolver.resolve('0')[0]
# Actions on datasets not used for normalization
self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset)
self.event_actions |= eh.CorrectChopperPhase()
@@ -56,70 +54,54 @@ class KafkaReduction:
self.loop()
def create_projections(self):
file_header = AmorHeader(self.current_file)
self.proj_yz = YZProjection()
self.proj_tofz = TofZProjection(file_header.timing.tau, foldback=True, combine=2)
self.proj_tofz = TofZProjection(self.event_data.timing.tau, foldback=True, combine=2)
def read_data(self):
self.dataset = AmorEventData(self.current_file, max_events=self.config.reduction.max_events)
# make sure the first events have arrived before starting analysis
self.event_data.new_events.wait()
self.dataset = self.event_data.get_events()
self.event_actions(self.dataset)
def add_data(self):
self.monitor = self.dataset.data.pulses.monitor.sum()
self.monitor = self.dataset.monitor
self.proj_yz.project(self.dataset, monitor=self.monitor)
self.proj_tofz.project(self.dataset, monitor=self.monitor)
def replace_dataset(self, latest):
new_file = self.path_resolver.resolve('0')[0]
if not os.path.exists(new_file):
return
try:
# check that events exist in the new file
AmorEventData(new_file, 0, max_events=1000)
except Exception:
logging.debug("Problem when trying to load new dataset", exc_info=True)
return
logging.warning(f"Preceding to next file {latest}")
self.current_file = new_file
self.create_projections() # need to recreate projections, in case tau changed
self.read_data()
self.add_data()
def loop(self):
self.wait_for = self.serializer.new_count_started
while True:
try:
self.update()
sleep(1.0)
self.wait_for.wait(1.0)
except KeyboardInterrupt:
self.event_data.stop_event.set()
self.event_data.join()
self.serializer.end_command_thread()
return
def update(self):
logging.debug(" check for update")
if self.config.reduction.fileIdentifier=='0':
# if latest file was choosen, check if new one available and switch to it
current = int(os.path.basename(self.current_file)[9:15])
latest = self.path_resolver.search_latest(0)
if latest>current:
self.replace_dataset(latest)
return
# if all events were read last time, only load more if file was modified
if self.dataset.EOF and os.path.getmtime(self.current_file)<=self._last_mtime:
return
self._last_mtime = os.path.getmtime(self.current_file)
if self.serializer.new_count_started.is_set():
logging.warning('Start new count, clearing event data')
self.wait_for = self.serializer.count_stopped
self.event_data.restart()
self.serializer.new_count_started.clear()
self.proj_yz.clear()
self.proj_tofz.clear()
elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set():
logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}')
self.wait_for = self.serializer.new_count_started
self.event_data.stop_counting.set()
try:
update_data = AmorEventData(self.current_file, self.dataset.last_index+1,
max_events=self.config.reduction.max_events)
update_data = self.event_data.get_events()
except EOFError:
return
logging.info(" updating with new data")
self.event_actions(update_data)
self.dataset=update_data
self.monitor = self.dataset.data.pulses.monitor.sum()
self.monitor = self.dataset.monitor
self.proj_yz.project(update_data, self.monitor)
self.proj_tofz.project(update_data, self.monitor)