From d722228079351b796f01e3497a2226c664ae10e9 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Thu, 23 Oct 2025 15:40:50 +0200 Subject: [PATCH] Implement live Kafke event processing with Nicos start/stop --- eos/file_reader.py | 4 +- eos/kafka_events.py | 165 ++++++++++++++++++++++++++++++++++++++++ eos/kafka_serializer.py | 77 +++++++++++++++++-- eos/reduction_kafka.py | 70 +++++++---------- 4 files changed, 263 insertions(+), 53 deletions(-) create mode 100644 eos/kafka_events.py diff --git a/eos/file_reader.py b/eos/file_reader.py index c115b78..53b57ec 100644 --- a/eos/file_reader.py +++ b/eos/file_reader.py @@ -41,7 +41,7 @@ class AmorHeader: def __init__(self, fileName:Union[str, h5py.File, BinaryIO]): if type(fileName) is str: - logging.warning(f' {fileName.split('/')[-1]}') + logging.warning(f' {fileName.split("/")[-1]}') self.hdf = h5py.File(fileName, 'r', swmr=True) elif type(fileName) is h5py.File: self.hdf = fileName @@ -223,7 +223,7 @@ class AmorEventData(AmorHeader): def __init__(self, fileName:Union[str, h5py.File, BinaryIO], first_index:int=0, max_events:int=100_000_000): if type(fileName) is str: - logging.warning(f' {fileName.split('/')[-1]}') + logging.warning(f' {fileName.split("/")[-1]}') self.file_list = [fileName] hdf = h5py.File(fileName, 'r', swmr=True) elif type(fileName) is h5py.File: diff --git a/eos/kafka_events.py b/eos/kafka_events.py new file mode 100644 index 0000000..824deb8 --- /dev/null +++ b/eos/kafka_events.py @@ -0,0 +1,165 @@ +""" +Collect AMOR detector events send via Kafka. +""" + +import logging +import numpy as np +from threading import Thread, Event +from time import time + +from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE + +from uuid import uuid4 +from streaming_data_types.eventdata_ev44 import EventData +from streaming_data_types.logdata_f144 import ExtractedLogData +from streaming_data_types import deserialise_f144, deserialise_ev44 +from confluent_kafka import Consumer + +from .header import Header + + +try: + from streaming_data_types.utils import get_schema +except ImportError: + from streaming_data_types.utils import _get_schema as get_schema + + +KAFKA_BROKER = 'linkafka01.psi.ch:9092' +AMOR_EVENTS = 'amor_ev44' +AMOR_NICOS = 'AMOR_nicosForwarder' + +class KafkaFrozenData: + """ + Represents event stream data from Kafka at a given time. + Will be returned by KafkaEventData to be use in conjunction + with data processing and projections. + + Implements EventDatasetProtocol + """ + geometry: AmorGeometry + timing: AmorTiming + data: AmorEventStream + + def __init__(self, geometry, timing, data, monitor=1.): + self.geometry = geometry + self.timing = timing + self.data = data + self.monitor = monitor + + def append(self, other): + raise NotImplementedError("can't append live datastream to other event data") + + def update_header(self, header:Header): + # maybe makes sense later, but for now just used for live vizualization + ... + +class KafkaEventData(Thread): + """ + Read Nicos information and events from Kafka. Creates a background + thread that listens to Kafka events and converts them to eos compatible information. + """ + geometry: AmorGeometry + timing: AmorTiming + events: np.recarray + + def __init__(self): + self.stop_event = Event() + self.stop_counting = Event() + self.new_events = Event() + self.last_read = 0 + self.last_read_time = 0. + self.start_time = time() + self.consumer = Consumer( + {'bootstrap.servers': 'linkafka01.psi.ch:9092', + 'group.id': uuid4()}) + self.consumer.subscribe([AMOR_EVENTS, AMOR_NICOS]) + self.geometry = AmorGeometry(1.0, 2.0, 0., 0., 1.5, 10.0, 4.0, 10.0) + self.timing = AmorTiming(0., 0., 500., 0., 30./500.) + # create empty dataset + self.events = np.recarray(0, dtype=EVENT_TYPE) + super().__init__() + + def run(self): + while not self.stop_event.is_set(): + messages = self.consumer.consume(10, timeout=1) + for message in messages: + self.process_message(message) + + def process_message(self, message): + if message.error(): + logging.info(f" received Kafka message with error: {message.error()}") + return + schema = get_schema(message.value()) + if message.topic()==AMOR_EVENTS and schema=='ev44': + events:EventData = deserialise_ev44(message.value()) + self.add_events(events) + self.new_events.set() + logging.debug(f' new events {events}') + elif message.topic()==AMOR_NICOS and schema=='f144': + nicos_data:ExtractedLogData = deserialise_f144(message.value()) + if nicos_data.source_name in self.nicos_mapping.keys(): + logging.debug(f' {nicos_data.source_name} = {nicos_data.value}') + self.update_instrument(nicos_data) + + def add_events(self, events:EventData): + """ + Add new events to the Dataset. The object keeps raw events + and only copies the latest set to the self.data object, + this allows to run the event processing to be performed on a "clean" + evnet stream each time. + """ + if self.stop_counting.is_set(): + return + prev_size = self.events.shape[0] + new_events = events.pixel_id.shape[0] + self.events.resize(prev_size+new_events, refcheck=False) + self.events.pixelID[prev_size:] = events.pixel_id + self.events.mask[prev_size:] = 0 + self.events.tof[prev_size:] = events.time_of_flight + + nicos_mapping = { + 'mu': ('geometry', 'mu'), + 'nu': ('geometry', 'nu'), + 'kappa': ('geometry', 'kap'), + 'kappa_offset': ('geometry', 'kad'), + 'ch1_trigger_phase': ('timing', 'ch1TriggerPhase'), + 'ch2_trigger_phase': ('timing', 'ch2TriggerPhase'), + 'ch2_speed': ('timing', 'chopperSpeed'), + 'chopper_phase': ('timing', 'chopperPhase'), + } + + def update_instrument(self, nicos_data:ExtractedLogData): + if nicos_data.source_name in self.nicos_mapping: + attr, subattr = self.nicos_mapping[nicos_data.source_name] + setattr(getattr(self, attr), subattr, nicos_data.value) + if nicos_data.source_name=='ch2_speed': + self.timing.tau = 30./self.timing.chopperSpeed + + def monitor(self): + return time()-self.start_time + + def restart(self): + # empty event buffer + self.events = np.recarray(0, dtype=EVENT_TYPE) + self.stop_counting.clear() + self.last_read = 0 + self.start_time = time() + self.new_events.clear() + + def get_events(self, total_counts=False): + packets = np.recarray(0, dtype=PACKET_TYPE) + pulses = np.recarray(0, dtype=PULSE_TYPE) + pc = np.recarray(0, dtype=PC_TYPE) + if total_counts: + last_read = 0 + else: + last_read = self.last_read + if last_read>=self.events.shape[0]: + raise EOFError("No new events arrived") + data = AmorEventStream(self.events[last_read:].copy(), packets, pulses, pc) + self.last_read = self.events.shape[0] + self.new_events.clear() + t_now = time() + monitor = t_now-self.last_read_time + self.last_read_time = t_now + return KafkaFrozenData(self.geometry, self.timing, data, monitor=monitor) diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index d76a5a6..a8c7c2c 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -29,13 +29,14 @@ hist = { """ import logging from typing import List, Tuple, Union +from threading import Thread, Event import numpy as np import json from time import time from dataclasses import dataclass, asdict from streaming_data_types import histogram_hs01 -from confluent_kafka import Producer, Consumer, TopicPartition +from confluent_kafka import Producer, Consumer from uuid import uuid4 @@ -134,6 +135,8 @@ class ESSSerializer: self._active_histogram_tofz = None self._last_message_yz = None self._last_message_tofz = None + self.new_count_started = Event() + self.count_stopped = Event() self.consumer.subscribe([KAFKA_TOPICS['command']]) @@ -147,7 +150,7 @@ class ESSSerializer: except Exception: logging.error(f'Could not interpret message: \n{command}', exc_info=True) return - logging.warning(command) + logging.info(command) resp = json.dumps({ "msg_id": getattr(command, "id", None) or command.msg_id, "response": "ACK", @@ -170,6 +173,7 @@ class ESSSerializer: "num events": message.data.sum() }) self._last_message_yz = None + self.count_stopped.set() elif command.hist_id == self._active_histogram_tofz: suffix = 'TofZ' self._active_histogram_tofz = None @@ -192,9 +196,12 @@ class ESSSerializer: if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': self._active_histogram_yz = hist.id logging.debug(f" histogram data_topic: {hist.data_topics}") + self._start = command.start + self.count_stopped.clear() + self.new_count_started.set() + self.set_empty_messages() if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': self._active_histogram_tofz = hist.id - self._start = command.start def receive(self, timeout=5): rec = self.consumer.poll(timeout) @@ -205,20 +212,19 @@ class ESSSerializer: return False def receive_loop(self): - while self._keep_receiving: + while not self._stop_receiving.is_set(): try: self.receive() except Exception: logging.error("Exception while receiving", exc_info=True) def start_command_thread(self): - from threading import Thread - self._keep_receiving = True + self._stop_receiving = Event() self._command_thread = Thread(target=self.receive_loop) self._command_thread.start() def end_command_thread(self, event=None): - self._keep_receiving = False + self._stop_receiving.set() self._command_thread.join() def acked(self, err, msg): @@ -261,6 +267,7 @@ class ESSSerializer: }) ) self._last_message_yz = message + logging.info(f" Sending {proj.data.cts.sum()} events to Nicos") elif isinstance(proj, TofZProjection): if self._active_histogram_tofz is None: return @@ -300,3 +307,59 @@ class ESSSerializer: topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() + + def set_empty_messages(self): + self._last_message_yz = HistogramMessage( + source='amor-eos', + timestamp=ktime(), + current_shape=(64, 448), + dim_metadata=( + DimMetadata( + length=64, + unit="pixel", + label="Y", + bin_boundaries=np.arange(64), + ), + DimMetadata( + length=448, + unit="pixel", + label="Z", + bin_boundaries=np.arange(448), + ) + ), + last_metadata_timestamp=0, + data=np.zeros((64, 448)), + errors=np.zeros((64, 448)), + info=json.dumps({ + "start": self._start, + "state": 'COUNTING', + "num events": 0 + }) + ) + self._last_message_tofz = message = HistogramMessage( + source='amor-eos', + timestamp=ktime(), + current_shape=(56, 448), + dim_metadata=( + DimMetadata( + length=56, + unit="ms", + label="ToF", + bin_boundaries=np.arange(56), + ), + DimMetadata( + length=448, + unit="pixel", + label="Z", + bin_boundaries=np.arange(448), + ), + ), + last_metadata_timestamp=0, + data=np.zeros((56, 448)), + errors=np.zeros((56, 448)), + info=json.dumps({ + "start": self._start, + "state": 'COUNTING', + "num events": 0 + }) + ) diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py index 3fd035e..34589c3 100644 --- a/eos/reduction_kafka.py +++ b/eos/reduction_kafka.py @@ -7,11 +7,10 @@ import logging import os from time import sleep -from .file_reader import AmorEventData, AmorHeader +from .kafka_events import KafkaEventData from .header import Header from .options import E2HConfig from . import event_handling as eh, event_analysis as ea -from .path_handling import PathResolver from .projection import TofZProjection, YZProjection from .kafka_serializer import ESSSerializer @@ -29,6 +28,8 @@ class KafkaReduction: self.config = config self.header = Header() + self.event_data = KafkaEventData() + self.event_data.start() self.prepare_actions() @@ -36,9 +37,6 @@ class KafkaReduction: """ Does not do any actual reduction. """ - self.path_resolver = PathResolver(self.config.reader.year, self.config.reader.rawPath) - self.current_file = self.path_resolver.resolve('0')[0] - # Actions on datasets not used for normalization self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset) self.event_actions |= eh.CorrectChopperPhase() @@ -56,70 +54,54 @@ class KafkaReduction: self.loop() def create_projections(self): - file_header = AmorHeader(self.current_file) self.proj_yz = YZProjection() - self.proj_tofz = TofZProjection(file_header.timing.tau, foldback=True, combine=2) - + self.proj_tofz = TofZProjection(self.event_data.timing.tau, foldback=True, combine=2) def read_data(self): - self.dataset = AmorEventData(self.current_file, max_events=self.config.reduction.max_events) + # make sure the first events have arrived before starting analysis + self.event_data.new_events.wait() + self.dataset = self.event_data.get_events() self.event_actions(self.dataset) + def add_data(self): - self.monitor = self.dataset.data.pulses.monitor.sum() + self.monitor = self.dataset.monitor self.proj_yz.project(self.dataset, monitor=self.monitor) self.proj_tofz.project(self.dataset, monitor=self.monitor) - def replace_dataset(self, latest): - new_file = self.path_resolver.resolve('0')[0] - if not os.path.exists(new_file): - return - try: - # check that events exist in the new file - AmorEventData(new_file, 0, max_events=1000) - except Exception: - logging.debug("Problem when trying to load new dataset", exc_info=True) - return - - logging.warning(f"Preceding to next file {latest}") - self.current_file = new_file - self.create_projections() # need to recreate projections, in case tau changed - self.read_data() - self.add_data() - def loop(self): + self.wait_for = self.serializer.new_count_started while True: try: self.update() - sleep(1.0) + self.wait_for.wait(1.0) except KeyboardInterrupt: + self.event_data.stop_event.set() + self.event_data.join() self.serializer.end_command_thread() return def update(self): - logging.debug(" check for update") - if self.config.reduction.fileIdentifier=='0': - # if latest file was choosen, check if new one available and switch to it - current = int(os.path.basename(self.current_file)[9:15]) - latest = self.path_resolver.search_latest(0) - if latest>current: - self.replace_dataset(latest) - return - # if all events were read last time, only load more if file was modified - if self.dataset.EOF and os.path.getmtime(self.current_file)<=self._last_mtime: - return - - self._last_mtime = os.path.getmtime(self.current_file) + if self.serializer.new_count_started.is_set(): + logging.warning('Start new count, clearing event data') + self.wait_for = self.serializer.count_stopped + self.event_data.restart() + self.serializer.new_count_started.clear() + self.proj_yz.clear() + self.proj_tofz.clear() + elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set(): + logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}') + self.wait_for = self.serializer.new_count_started + self.event_data.stop_counting.set() try: - update_data = AmorEventData(self.current_file, self.dataset.last_index+1, - max_events=self.config.reduction.max_events) + update_data = self.event_data.get_events() except EOFError: return logging.info(" updating with new data") self.event_actions(update_data) self.dataset=update_data - self.monitor = self.dataset.data.pulses.monitor.sum() + self.monitor = self.dataset.monitor self.proj_yz.project(update_data, self.monitor) self.proj_tofz.project(update_data, self.monitor)