From fc1bd66c3d6ebffd05aeb8020dd3d9702144bafd Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Mon, 13 Oct 2025 17:44:52 +0200 Subject: [PATCH 01/10] Start implementing kafka serializer to send image to Nicos, works without control messages for YZ graph --- eos/kafka_serializer.py | 176 ++++++++++++++++++++++++++++++++++++++++ eos/options.py | 8 ++ eos/reduction_e2h.py | 10 ++- 3 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 eos/kafka_serializer.py diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py new file mode 100644 index 0000000..8e165be --- /dev/null +++ b/eos/kafka_serializer.py @@ -0,0 +1,176 @@ +""" +Allows to send eos projections to Kafka using ESS histogram serialization. + +For histogram_h01 the message is build using: + +hist = { + "source": "some_source", + "timestamp": 123456, + "current_shape": [2, 5], + "dim_metadata": [ + { + "length": 2, + "unit": "a", + "label": "x", + "bin_boundaries": np.array([10, 11, 12]), + }, + { + "length": 5, + "unit": "b", + "label": "y", + "bin_boundaries": np.array([0, 1, 2, 3, 4, 5]), + }, + ], + "last_metadata_timestamp": 123456, + "data": np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]), + "errors": np.array([[5, 4, 3, 2, 1], [10, 9, 8, 7, 6]]), + "info": "info_string", +} +""" +from typing import Tuple, Union + +import numpy as np +import json +from time import time +from dataclasses import dataclass, asdict +from streaming_data_types import histogram_hs00 +from confluent_kafka import Producer, Consumer, TopicPartition + +from uuid import uuid4 + +from .projection import LZProjection, YZProjection + +@dataclass +class DimMetadata: + length: int + unit: str + label: str + bin_boundaries: np.ndarray + +@dataclass +class HistogramMessage: + source: str + timestamp: int + current_shape: Tuple[int, int] + dim_metadata: Tuple[DimMetadata, DimMetadata] + last_metadata_timestamp: int + data: np.ndarray + errors: np.ndarray + info: str + + def serialize(self): + return histogram_hs00.serialise_hs00(asdict(self)) + +class ESSSerializer: + + def __init__(self): + self.producer = Producer({ + 'bootstrap.servers': 'linkafka01.psi.ch:9092', + }) + self.consumer = Consumer({ + 'bootstrap.servers': 'linkafka01.psi.ch:9092', + "group.id": uuid4(), + "default.topic.config": {"auto.offset.reset": "latest"}, + }) + + #tp = [TopicPartition( "SANSLLB_histCommands",0)] + #self.consumer.assign(tp) + self.consumer.subscribe(["SANSLLB_histCommands"]) + + def process_message(self, message): + if message.error(): + print("Command Consumer Error: %s", message.error()) + else: + command = json.loads(message.value().decode()) + print(command) + resp = json.dumps({ + "msg_id": command.get("id") or command.get("msg_id"), + "response": "ACK", + "message": "" + }) + self.producer.produce( + topic="SANSLLB_histResponse", + value=resp + ) + self.producer.flush() + + def receive(self, timeout=5): + rec = self.consumer.poll(5) + if rec is not None: + self.process_message(rec) + return True + else: + return False + + + def acked(self, err, msg): + # We need to have callback to produce-method to catch server errors + if err is not None: + print("Failed to deliver message: %s: %s" % (str(msg), str(err))) + else: + print("Message produced: %s" % (str(msg))) + + def send(self, proj: Union[YZProjection, LZProjection]): + if isinstance(proj, YZProjection): + message = HistogramMessage( + source='just-bin-it', + timestamp=int(time()), + current_shape=(proj.y.shape[0]-1, proj.z.shape[0]-1), + dim_metadata=( + DimMetadata( + length=proj.y.shape[0]-1, + unit="pixel", + label="Y", + bin_boundaries=proj.y, + ), + DimMetadata( + length=proj.z.shape[0]-1, + unit="pixel", + label="Z", + bin_boundaries=proj.z, + ) + ), + last_metadata_timestamp=0, + data=proj.data.I, + errors=proj.data.err, + info=json.dumps({ + "start": int(time()), + "state": 'COUNTING', + "num events": proj.data.cts.sum() + }) + ) + elif isinstance(proj, LZProjection): + message = HistogramMessage( + source='just-bin-it', + timestamp=int(time()), + current_shape=(proj.lamda.shape[0]-1, proj.alphaF.shape[0]-1), + dim_metadata=( + DimMetadata( + length=proj.lamda.shape[0]-1, + unit="Angstrom", + label="Lambda", + bin_boundaries=proj.lamda, + ), + DimMetadata( + length=proj.alphaF.shape[0]-1, + unit="degrees", + label="Theta", + bin_boundaries=proj.alphaF, + ) + ), + last_metadata_timestamp=0, + data=proj.data.ref, + errors=proj.data.err, + info=json.dumps({ + "start": int(time()), + "state": 'COUNTING', + "num events": proj.data.I.sum() + }) + ) + else: + raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") + + self.producer.produce(value=message.serialize(), + topic='SANSLLB_histograms', + callback=self.acked) + self.producer.flush() diff --git a/eos/options.py b/eos/options.py index 43c6d05..b133066 100644 --- a/eos/options.py +++ b/eos/options.py @@ -655,6 +655,14 @@ class E2HReductionConfig(ArgParsable): }, ) + kafka: bool = field( + default=False, + metadata={ + 'group': 'output', + 'help': 'send result to kafka for Nicos', + }, + ) + plotArgs: E2HPlotArguments = field( default=E2HPlotArguments.Default, metadata={ diff --git a/eos/reduction_e2h.py b/eos/reduction_e2h.py index 2561bf9..9269d3d 100644 --- a/eos/reduction_e2h.py +++ b/eos/reduction_e2h.py @@ -124,6 +124,10 @@ class E2HReduction: if self.config.reduction.plotArgs==E2HPlotArguments.Default and not self.config.reduction.update: # safe to image file if not auto-updating graph plt.savefig(f'e2h_{self.config.reduction.plot}.png', dpi=300) + if self.config.reduction.kafka: + from .kafka_serializer import ESSSerializer + self.serializer = ESSSerializer() + self.serializer.send(self.projection) if self.config.reduction.update: self.timer = self.fig.canvas.new_timer(1000) self.timer.add_callback(self.update) @@ -131,6 +135,7 @@ class E2HReduction: if self.config.reduction.show_plot: plt.show() + def register_colormap(self): cmap = plt.colormaps['turbo'](np.arange(256)) cmap[:1, :] = np.array([256/256, 255/256, 236/256, 1]) @@ -298,4 +303,7 @@ class E2HReduction: self.projection.update_plot() plt.suptitle(self.create_title()) - plt.draw() \ No newline at end of file + plt.draw() + + if self.config.reduction.kafka: + self.serializer.send(self.projection) From 0b7a56628ff859b35f2473b270a5bb02efe7597e Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Tue, 14 Oct 2025 14:48:49 +0200 Subject: [PATCH 02/10] Transfer to NICOS working for YZ and TofZ projections --- eos/kafka_serializer.py | 174 +++++++++++++++++++++++++++++++--------- eos/reduction_e2h.py | 5 +- 2 files changed, 141 insertions(+), 38 deletions(-) diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index 8e165be..42ef8b0 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -27,7 +27,8 @@ hist = { "info": "info_string", } """ -from typing import Tuple, Union +import logging +from typing import List, Tuple, Union import numpy as np import json @@ -38,7 +39,17 @@ from confluent_kafka import Producer, Consumer, TopicPartition from uuid import uuid4 -from .projection import LZProjection, YZProjection +from .projection import TofZProjection, YZProjection + +KAFKA_BROKER = 'linkafka01.psi.ch:9092' +KAFKA_TOPICS = { + 'histogram': 'AMOR_histograms', + 'response': 'AMOR_histResponse', + 'command': 'AMOR_histCommands' + } + +def ktime(): + return int(time()*1_000) @dataclass class DimMetadata: @@ -61,60 +72,148 @@ class HistogramMessage: def serialize(self): return histogram_hs00.serialise_hs00(asdict(self)) +@dataclass +class CommandMessage: + msg_id: str + + cmd=None + + @classmethod + def get_message(cls, data): + """ + Uses the sub-class cmd attribute to select which message to retugn + """ + msg = dict([(ci.cmd, ci) for ci in cls.__subclasses__()]) + return msg[data['cmd']](**data) + + +@dataclass +class Stop(CommandMessage): + hist_id: str + id: str + cmd:str = 'stop' + +@dataclass +class HistogramConfig: + id: str + type: str + data_brokers: List[str] + topic: str + data_topics: List[str] + tof_range: Tuple[float, float] + det_range: Tuple[int, int] + num_bins: int + width: int + height: int + left_edges: list + source: str + +@dataclass +class ConfigureHistogram(CommandMessage): + histograms: List[HistogramConfig] + start: int + cmd:str = 'config' + + def __post_init__(self): + self.histograms = [HistogramConfig(**cfg) for cfg in self.histograms] + + class ESSSerializer: def __init__(self): self.producer = Producer({ - 'bootstrap.servers': 'linkafka01.psi.ch:9092', + 'bootstrap.servers': KAFKA_BROKER, + 'message.max.bytes': 4_000_000, }) self.consumer = Consumer({ - 'bootstrap.servers': 'linkafka01.psi.ch:9092', + 'bootstrap.servers': KAFKA_BROKER, "group.id": uuid4(), "default.topic.config": {"auto.offset.reset": "latest"}, }) + self._active_histogram = None + self._last_message = None - #tp = [TopicPartition( "SANSLLB_histCommands",0)] - #self.consumer.assign(tp) - self.consumer.subscribe(["SANSLLB_histCommands"]) + self.consumer.subscribe([KAFKA_TOPICS['command']]) def process_message(self, message): if message.error(): - print("Command Consumer Error: %s", message.error()) + logging.error("Command Consumer Error: %s", message.error()) else: command = json.loads(message.value().decode()) - print(command) + try: + command = CommandMessage.get_message(command) + except Exception: + logging.error(f'Could not interpret message: \n{command}', exc_info=True) + return + logging.warning(command) resp = json.dumps({ - "msg_id": command.get("id") or command.get("msg_id"), + "msg_id": getattr(command, "id", None) or command.msg_id, "response": "ACK", "message": "" }) self.producer.produce( - topic="SANSLLB_histResponse", + topic=KAFKA_TOPICS['response'], value=resp ) self.producer.flush() + if isinstance(command, Stop) and command.hist_id == self._active_histogram: + self._active_histogram = None + message = self._last_message + message.timestamp = ktime() + message.info=json.dumps({ + "start": self._start, + "state": 'FINISHED', + "num events": message.data.sum() + }) + self._last_message = None + self.producer.produce(value=message.serialize(), + topic=KAFKA_TOPICS['histogram'], + callback=self.acked) + self.producer.flush() + elif isinstance(command, ConfigureHistogram): + self._active_histogram = command.histograms[0].id + self._start = command.start def receive(self, timeout=5): - rec = self.consumer.poll(5) + rec = self.consumer.poll(timeout) if rec is not None: self.process_message(rec) return True else: return False + def receive_loop(self): + while self._keep_receiving: + try: + self.receive() + except Exception: + logging.error("Exception while receiving", exc_info=True) + + def start_command_thread(self): + from threading import Thread + self._keep_receiving = True + self._command_thread = Thread(target=self.receive_loop) + self._command_thread.start() + + def end_command_thread(self, event=None): + self._keep_receiving = False + self._command_thread.join() def acked(self, err, msg): # We need to have callback to produce-method to catch server errors if err is not None: - print("Failed to deliver message: %s: %s" % (str(msg), str(err))) + logging.warning("Failed to deliver message: %s: %s" % (str(msg), str(err))) else: - print("Message produced: %s" % (str(msg))) + logging.debug("Message produced: %s" % (str(msg))) - def send(self, proj: Union[YZProjection, LZProjection]): + def send(self, proj: Union[YZProjection, TofZProjection]): + if self._active_histogram is None: + proj.clear() + return if isinstance(proj, YZProjection): message = HistogramMessage( - source='just-bin-it', - timestamp=int(time()), + source='amor-eos', + timestamp=ktime(), current_shape=(proj.y.shape[0]-1, proj.z.shape[0]-1), dim_metadata=( DimMetadata( @@ -131,38 +230,38 @@ class ESSSerializer: ) ), last_metadata_timestamp=0, - data=proj.data.I, - errors=proj.data.err, + data=proj.data.cts, + errors=np.sqrt(proj.data.cts), info=json.dumps({ - "start": int(time()), + "start": self._start, "state": 'COUNTING', "num events": proj.data.cts.sum() }) ) - elif isinstance(proj, LZProjection): + elif isinstance(proj, TofZProjection): message = HistogramMessage( - source='just-bin-it', - timestamp=int(time()), - current_shape=(proj.lamda.shape[0]-1, proj.alphaF.shape[0]-1), + source='amor-eos', + timestamp=ktime(), + current_shape=(proj.tof.shape[0]-1, proj.z.shape[0]-1), dim_metadata=( DimMetadata( - length=proj.lamda.shape[0]-1, - unit="Angstrom", - label="Lambda", - bin_boundaries=proj.lamda, + length=proj.tof.shape[0]-1, + unit="ms", + label="ToF", + bin_boundaries=proj.tof, ), DimMetadata( - length=proj.alphaF.shape[0]-1, - unit="degrees", - label="Theta", - bin_boundaries=proj.alphaF, - ) + length=proj.z.shape[0]-1, + unit="pixel", + label="Z", + bin_boundaries=proj.z, + ), ), last_metadata_timestamp=0, - data=proj.data.ref, - errors=proj.data.err, + data=proj.data.cts, + errors=np.sqrt(proj.data.cts), info=json.dumps({ - "start": int(time()), + "start": self._start, "state": 'COUNTING', "num events": proj.data.I.sum() }) @@ -170,7 +269,8 @@ class ESSSerializer: else: raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") + self._last_message = message self.producer.produce(value=message.serialize(), - topic='SANSLLB_histograms', + topic=KAFKA_TOPICS['histogram'], callback=self.acked) self.producer.flush() diff --git a/eos/reduction_e2h.py b/eos/reduction_e2h.py index 9269d3d..63a329f 100644 --- a/eos/reduction_e2h.py +++ b/eos/reduction_e2h.py @@ -93,7 +93,8 @@ class E2HReduction: # plot dependant options if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.LT, E2HPlotSelection.Q]: - self.grid = LZGrid(0.01, [0.0, 0.25]) + self.grid = LZGrid(0.05, [0.0, 0.25]) + self.grid.dldl = 0.05 if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.Raw, E2HPlotSelection.LT, E2HPlotSelection.YT, @@ -127,6 +128,8 @@ class E2HReduction: if self.config.reduction.kafka: from .kafka_serializer import ESSSerializer self.serializer = ESSSerializer() + self.fig.canvas.mpl_connect('close_event', self.serializer.end_command_thread) + self.serializer.start_command_thread() self.serializer.send(self.projection) if self.config.reduction.update: self.timer = self.fig.canvas.new_timer(1000) From 322c00ca5430e73f2fe3f864f38782eb41117112 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Tue, 14 Oct 2025 15:28:32 +0200 Subject: [PATCH 03/10] add nicos daemon for YZ + TofZ histogramming of latest dataset --- eos/kafka_serializer.py | 65 +++++++++++++++------ eos/nicos.py | 42 ++++++++++++++ eos/reduction_kafka.py | 125 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 213 insertions(+), 19 deletions(-) create mode 100644 eos/nicos.py create mode 100644 eos/reduction_kafka.py diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index 42ef8b0..140e6fb 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -130,8 +130,10 @@ class ESSSerializer: "group.id": uuid4(), "default.topic.config": {"auto.offset.reset": "latest"}, }) - self._active_histogram = None - self._last_message = None + self._active_histogram_yz = None + self._active_histogram_tofz = None + self._last_message_yz = None + self._last_message_tofz = None self.consumer.subscribe([KAFKA_TOPICS['command']]) @@ -156,22 +158,41 @@ class ESSSerializer: value=resp ) self.producer.flush() - if isinstance(command, Stop) and command.hist_id == self._active_histogram: - self._active_histogram = None - message = self._last_message - message.timestamp = ktime() - message.info=json.dumps({ - "start": self._start, - "state": 'FINISHED', - "num events": message.data.sum() - }) - self._last_message = None + if isinstance(command, Stop): + if command.hist_id == self._active_histogram_yz: + suffix = 'YZ' + self._active_histogram_yz = None + message = self._last_message_yz + message.timestamp = ktime() + message.info=json.dumps({ + "start": self._start, + "state": 'FINISHED', + "num events": message.data.sum() + }) + self._last_message_yz = None + elif command.hist_id == self._active_histogram_tofz: + suffix = 'TofZ' + self._active_histogram_tofz = None + message = self._last_message_tofz + message.timestamp = ktime() + message.info=json.dumps({ + "start": self._start, + "state": 'FINISHED', + "num events": message.data.sum() + }) + self._last_message_tofz = None + else: + return self.producer.produce(value=message.serialize(), - topic=KAFKA_TOPICS['histogram'], + topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() elif isinstance(command, ConfigureHistogram): - self._active_histogram = command.histograms[0].id + for hist in command.histograms: + if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': + self._active_histogram_yz = hist.id + if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': + self._active_histogram_tofz = hist.id self._start = command.start def receive(self, timeout=5): @@ -207,10 +228,11 @@ class ESSSerializer: logging.debug("Message produced: %s" % (str(msg))) def send(self, proj: Union[YZProjection, TofZProjection]): - if self._active_histogram is None: - proj.clear() - return if isinstance(proj, YZProjection): + if self._active_histogram_yz is None: + proj.clear() + return + suffix = 'YZ' message = HistogramMessage( source='amor-eos', timestamp=ktime(), @@ -238,7 +260,12 @@ class ESSSerializer: "num events": proj.data.cts.sum() }) ) + self._last_message_yz = message elif isinstance(proj, TofZProjection): + if self._active_histogram_tofz is None: + proj.clear() + return + suffix = 'TofZ' message = HistogramMessage( source='amor-eos', timestamp=ktime(), @@ -266,11 +293,11 @@ class ESSSerializer: "num events": proj.data.I.sum() }) ) + self._last_message_tofz = message else: raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") - self._last_message = message self.producer.produce(value=message.serialize(), - topic=KAFKA_TOPICS['histogram'], + topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() diff --git a/eos/nicos.py b/eos/nicos.py new file mode 100644 index 0000000..37ea714 --- /dev/null +++ b/eos/nicos.py @@ -0,0 +1,42 @@ +""" +events2histogram vizualising data from Amor@SINQ, PSI + +Author: Jochen Stahn (algorithms, python draft), + Artur Glavic (structuring and optimisation of code) +""" +import logging + +# need to do absolute import here as pyinstaller requires it +from eos.options import E2HConfig, ReaderConfig, ExperimentConfig, E2HReductionConfig +from eos.command_line import commandLineArgs +from eos.logconfig import setup_logging, update_loglevel + + +def main(): + setup_logging() + logging.getLogger('matplotlib').setLevel(logging.WARNING) + + # read command line arguments and generate classes holding configuration parameters + clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig], + 'events2histogram') + update_loglevel(clas.verbose) + + reader_config = ReaderConfig.from_args(clas) + experiment_config = ExperimentConfig.from_args(clas) + reduction_config = E2HReductionConfig.from_args(clas) + config = E2HConfig(reader_config, experiment_config, reduction_config) + + logging.warning('######## events2histogram - data vizualization for Amor ########') + from eos.reduction_kafka import KafkaReduction + + # only import heavy module if sufficient command line parameters were provided + from eos.reduction_reflectivity import ReflectivityReduction + # Create reducer with these arguments + reducer = KafkaReduction(config) + # Perform actual reduction + reducer.reduce() + + logging.info('######## events2histogram - finished ########') + +if __name__ == '__main__': + main() diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py new file mode 100644 index 0000000..dd5d0ae --- /dev/null +++ b/eos/reduction_kafka.py @@ -0,0 +1,125 @@ +""" +Events 2 histogram, quick reduction of single file to display during experiment. +Can be used as a live preview with automatic update when files are modified. +""" + +import logging +import os + +from time import sleep +from .file_reader import AmorEventData, AmorHeader +from .header import Header +from .options import E2HConfig +from . import event_handling as eh, event_analysis as ea +from .path_handling import PathResolver +from .projection import TofZProjection, YZProjection +from .kafka_serializer import ESSSerializer + + +class KafkaReduction: + config: E2HConfig + header: Header + event_actions: eh.EventDataAction + + _last_mtime = 0. + proj_yz: YZProjection + proj_tofz = TofZProjection + + def __init__(self, config: E2HConfig): + self.config = config + + self.header = Header() + + self.prepare_actions() + + def prepare_actions(self): + """ + Does not do any actual reduction. + """ + self.path_resolver = PathResolver(self.config.reader.year, self.config.reader.rawPath) + self.current_file = self.path_resolver.resolve('0')[0] + + # Actions on datasets not used for normalization + self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset) + self.event_actions |= eh.CorrectChopperPhase() + self.event_actions |= ea.MergeFrames() + self.event_actions |= eh.ApplyMask() + + def reduce(self): + last_file_header = AmorHeader(self.current_file) + + self.proj_yz = YZProjection() + self.proj_tofz = TofZProjection(last_file_header.timing.tau, foldback=True) + + self.read_data() + self.add_data() + + self.serializer = ESSSerializer() + self.serializer.start_command_thread() + + self.loop() + + + def read_data(self): + self.dataset = AmorEventData(self.current_file, max_events=self.config.reduction.max_events) + self.event_actions(self.dataset) + + def add_data(self): + self.monitor = self.dataset.data.pulses.monitor.sum() + self.proj_yz.project(self.dataset, monitor=self.monitor) + self.proj_tofz.project(self.dataset, monitor=self.monitor) + + def replace_dataset(self, latest): + new_file = self.path_resolver.resolve('0')[0] + if not os.path.exists(new_file): + return + try: + # check that events exist in the new file + AmorEventData(new_file, 0, max_events=1000) + except Exception: + logging.debug("Problem when trying to load new dataset", exc_info=True) + return + + logging.warning(f"Preceding to next file {latest}") + self.current_file = new_file + self.read_data() + self.add_data() + + def loop(self): + while True: + try: + self.update() + sleep(1.0) + except KeyboardInterrupt: + self.serializer.end_command_thread() + return + + def update(self): + logging.debug(" check for update") + if self.config.reduction.fileIdentifier=='0': + # if latest file was choosen, check if new one available and switch to it + current = int(os.path.basename(self.current_file)[9:15]) + latest = self.path_resolver.search_latest(0) + if latest>current: + self.replace_dataset(latest) + return + # if all events were read last time, only load more if file was modified + if self.dataset.EOF and os.path.getmtime(self.current_file)<=self._last_mtime: + return + + self._last_mtime = os.path.getmtime(self.current_file) + try: + update_data = AmorEventData(self.current_file, self.dataset.last_index+1, + max_events=self.config.reduction.max_events) + except EOFError: + return + logging.info(" updating with new data") + + self.event_actions(update_data) + self.dataset=update_data + self.monitor = self.dataset.data.pulses.monitor.sum() + self.proj_yz.project(update_data, self.monitor) + self.proj_tofz.project(update_data, self.monitor) + + self.serializer.send(self.proj_yz) + self.serializer.send(self.proj_tofz) From 14ea89e243597fa6db63e38b93ec368a72852a57 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Tue, 14 Oct 2025 15:49:06 +0200 Subject: [PATCH 04/10] Do only clear projections when switching to new file --- eos/file_reader.py | 2 +- eos/kafka_serializer.py | 2 -- eos/nicos.py | 6 +++--- eos/reduction_kafka.py | 2 ++ 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/eos/file_reader.py b/eos/file_reader.py index 1f67ddf..0017b2c 100644 --- a/eos/file_reader.py +++ b/eos/file_reader.py @@ -41,7 +41,7 @@ class AmorHeader: def __init__(self, fileName:Union[str, h5py.File, BinaryIO]): if type(fileName) is str: - logging.debug(f' opening file {fileName}') + logging.info(f' opening file {fileName}') self.hdf = h5py.File(fileName, 'r', swmr=True) elif type(fileName) is h5py.File: self.hdf = fileName diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index 140e6fb..d09773a 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -230,7 +230,6 @@ class ESSSerializer: def send(self, proj: Union[YZProjection, TofZProjection]): if isinstance(proj, YZProjection): if self._active_histogram_yz is None: - proj.clear() return suffix = 'YZ' message = HistogramMessage( @@ -263,7 +262,6 @@ class ESSSerializer: self._last_message_yz = message elif isinstance(proj, TofZProjection): if self._active_histogram_tofz is None: - proj.clear() return suffix = 'TofZ' message = HistogramMessage( diff --git a/eos/nicos.py b/eos/nicos.py index 37ea714..147d935 100644 --- a/eos/nicos.py +++ b/eos/nicos.py @@ -18,7 +18,7 @@ def main(): # read command line arguments and generate classes holding configuration parameters clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig], - 'events2histogram') + 'amor-nicos') update_loglevel(clas.verbose) reader_config = ReaderConfig.from_args(clas) @@ -26,7 +26,7 @@ def main(): reduction_config = E2HReductionConfig.from_args(clas) config = E2HConfig(reader_config, experiment_config, reduction_config) - logging.warning('######## events2histogram - data vizualization for Amor ########') + logging.warning('######## amor-nicos - Nicos histogram for Amor ########') from eos.reduction_kafka import KafkaReduction # only import heavy module if sufficient command line parameters were provided @@ -36,7 +36,7 @@ def main(): # Perform actual reduction reducer.reduce() - logging.info('######## events2histogram - finished ########') + logging.info('######## amor-nicos - finished ########') if __name__ == '__main__': main() diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py index dd5d0ae..83f33fd 100644 --- a/eos/reduction_kafka.py +++ b/eos/reduction_kafka.py @@ -82,6 +82,8 @@ class KafkaReduction: logging.warning(f"Preceding to next file {latest}") self.current_file = new_file + self.proj_yz.clear() + self.proj_tofz.clear() self.read_data() self.add_data() From 4e5d085ad7df8aabf981dbb0732f2bec355e9ef2 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Wed, 15 Oct 2025 10:39:59 +0200 Subject: [PATCH 05/10] Add option to bin tof values, rebuild projections on file change, switch to hs01 serialization and give nicos config help --- eos/kafka_serializer.py | 4 ++-- eos/projection.py | 6 +++--- eos/reduction_kafka.py | 14 +++++++------- nicos_config.md | 42 +++++++++++++++++++++++++++++++++++++++++ setup.cfg | 1 + 5 files changed, 55 insertions(+), 12 deletions(-) create mode 100644 nicos_config.md diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index d09773a..ebc9865 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -34,7 +34,7 @@ import numpy as np import json from time import time from dataclasses import dataclass, asdict -from streaming_data_types import histogram_hs00 +from streaming_data_types import histogram_hs01 from confluent_kafka import Producer, Consumer, TopicPartition from uuid import uuid4 @@ -70,7 +70,7 @@ class HistogramMessage: info: str def serialize(self): - return histogram_hs00.serialise_hs00(asdict(self)) + return histogram_hs01.serialise_hs01(asdict(self)) @dataclass class CommandMessage: diff --git a/eos/projection.py b/eos/projection.py index 36b8e1e..d78ee5f 100644 --- a/eos/projection.py +++ b/eos/projection.py @@ -544,12 +544,12 @@ class TofZProjection(ProjectionInterface): ('err', np.float64), ]) - def __init__(self, tau, foldback=False): + def __init__(self, tau, foldback=False, combine=1): self.z = np.arange(Detector.nBlades*Detector.nWires+1)-0.5 if foldback: - self.tof = np.arange(0, tau, 0.0005) + self.tof = np.arange(0, tau, 0.0005*combine) else: - self.tof = np.arange(0, 2*tau, 0.0005) + self.tof = np.arange(0, 2*tau, 0.0005*combine) self.data = np.zeros((self.tof.shape[0]-1, self.z.shape[0]-1), dtype=self._dtype).view(np.recarray) self.monitor = 0. diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py index 83f33fd..3fd035e 100644 --- a/eos/reduction_kafka.py +++ b/eos/reduction_kafka.py @@ -46,11 +46,7 @@ class KafkaReduction: self.event_actions |= eh.ApplyMask() def reduce(self): - last_file_header = AmorHeader(self.current_file) - - self.proj_yz = YZProjection() - self.proj_tofz = TofZProjection(last_file_header.timing.tau, foldback=True) - + self.create_projections() self.read_data() self.add_data() @@ -59,6 +55,11 @@ class KafkaReduction: self.loop() + def create_projections(self): + file_header = AmorHeader(self.current_file) + self.proj_yz = YZProjection() + self.proj_tofz = TofZProjection(file_header.timing.tau, foldback=True, combine=2) + def read_data(self): self.dataset = AmorEventData(self.current_file, max_events=self.config.reduction.max_events) @@ -82,8 +83,7 @@ class KafkaReduction: logging.warning(f"Preceding to next file {latest}") self.current_file = new_file - self.proj_yz.clear() - self.proj_tofz.clear() + self.create_projections() # need to recreate projections, in case tau changed self.read_data() self.add_data() diff --git a/nicos_config.md b/nicos_config.md new file mode 100644 index 0000000..72e85de --- /dev/null +++ b/nicos_config.md @@ -0,0 +1,42 @@ +EOS-Service +=========== + +EOS can be used as histogram service to send images to the Nicos instrument control software. +For that you need to run it on the amor instrument computer: + +```bash +amor-nicos {-vv} +``` + +The instrument config in Nicos needs to configure a Kafka JustBinItImage instance +for each histogram that should be used: + +```python +hist_yz = device('nicos_sinq.devices.just_bin_it.JustBinItImage', + description = 'Detector pixel histogram over all times', + hist_topic = 'AMOR_histograms_YZ', + data_topic = 'AMOR_detector', + command_topic = 'AMOR_histCommands', + brokers = ['linkafka01.psi.ch:9092'], + unit = 'evts', + hist_type = '2-D SANSLLB', + det_width = 446, + det_height = 64, + ), +hist_tofz = device('nicos_sinq.devices.just_bin_it.JustBinItImage', + description = 'Detector time of flight vs. z-pixel histogram over all y-values', + hist_topic = 'AMOR_histograms_TofZ', + data_topic = 'AMOR_detector', + command_topic = 'AMOR_histCommands', + brokers = ['linkafka01.psi.ch:9092'], + unit = 'evts', + hist_type = '2-D SANSLLB', + det_width = 118, + det_height = 446, + ), +``` + +These images have then to be set in the detector configuration as _images_ items: +``` +images=['hist_yz', 'hist_tofz'], +``` \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 7a88dc9..2c9fa4a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,3 +35,4 @@ Homepage = "https://github.com/jochenstahn/amor" console_scripts = eos = eos.__main__:main events2histogram = eos.e2h:main + amor-nicos = eos.nicos:main From c429429dd2dede65c35a0908d150b6c59231072f Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Thu, 23 Oct 2025 08:27:33 +0200 Subject: [PATCH 06/10] minor --- eos/kafka_serializer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index ebc9865..d76a5a6 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -191,6 +191,7 @@ class ESSSerializer: for hist in command.histograms: if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': self._active_histogram_yz = hist.id + logging.debug(f" histogram data_topic: {hist.data_topics}") if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': self._active_histogram_tofz = hist.id self._start = command.start From d722228079351b796f01e3497a2226c664ae10e9 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Thu, 23 Oct 2025 15:40:50 +0200 Subject: [PATCH 07/10] Implement live Kafke event processing with Nicos start/stop --- eos/file_reader.py | 4 +- eos/kafka_events.py | 165 ++++++++++++++++++++++++++++++++++++++++ eos/kafka_serializer.py | 77 +++++++++++++++++-- eos/reduction_kafka.py | 70 +++++++---------- 4 files changed, 263 insertions(+), 53 deletions(-) create mode 100644 eos/kafka_events.py diff --git a/eos/file_reader.py b/eos/file_reader.py index c115b78..53b57ec 100644 --- a/eos/file_reader.py +++ b/eos/file_reader.py @@ -41,7 +41,7 @@ class AmorHeader: def __init__(self, fileName:Union[str, h5py.File, BinaryIO]): if type(fileName) is str: - logging.warning(f' {fileName.split('/')[-1]}') + logging.warning(f' {fileName.split("/")[-1]}') self.hdf = h5py.File(fileName, 'r', swmr=True) elif type(fileName) is h5py.File: self.hdf = fileName @@ -223,7 +223,7 @@ class AmorEventData(AmorHeader): def __init__(self, fileName:Union[str, h5py.File, BinaryIO], first_index:int=0, max_events:int=100_000_000): if type(fileName) is str: - logging.warning(f' {fileName.split('/')[-1]}') + logging.warning(f' {fileName.split("/")[-1]}') self.file_list = [fileName] hdf = h5py.File(fileName, 'r', swmr=True) elif type(fileName) is h5py.File: diff --git a/eos/kafka_events.py b/eos/kafka_events.py new file mode 100644 index 0000000..824deb8 --- /dev/null +++ b/eos/kafka_events.py @@ -0,0 +1,165 @@ +""" +Collect AMOR detector events send via Kafka. +""" + +import logging +import numpy as np +from threading import Thread, Event +from time import time + +from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE + +from uuid import uuid4 +from streaming_data_types.eventdata_ev44 import EventData +from streaming_data_types.logdata_f144 import ExtractedLogData +from streaming_data_types import deserialise_f144, deserialise_ev44 +from confluent_kafka import Consumer + +from .header import Header + + +try: + from streaming_data_types.utils import get_schema +except ImportError: + from streaming_data_types.utils import _get_schema as get_schema + + +KAFKA_BROKER = 'linkafka01.psi.ch:9092' +AMOR_EVENTS = 'amor_ev44' +AMOR_NICOS = 'AMOR_nicosForwarder' + +class KafkaFrozenData: + """ + Represents event stream data from Kafka at a given time. + Will be returned by KafkaEventData to be use in conjunction + with data processing and projections. + + Implements EventDatasetProtocol + """ + geometry: AmorGeometry + timing: AmorTiming + data: AmorEventStream + + def __init__(self, geometry, timing, data, monitor=1.): + self.geometry = geometry + self.timing = timing + self.data = data + self.monitor = monitor + + def append(self, other): + raise NotImplementedError("can't append live datastream to other event data") + + def update_header(self, header:Header): + # maybe makes sense later, but for now just used for live vizualization + ... + +class KafkaEventData(Thread): + """ + Read Nicos information and events from Kafka. Creates a background + thread that listens to Kafka events and converts them to eos compatible information. + """ + geometry: AmorGeometry + timing: AmorTiming + events: np.recarray + + def __init__(self): + self.stop_event = Event() + self.stop_counting = Event() + self.new_events = Event() + self.last_read = 0 + self.last_read_time = 0. + self.start_time = time() + self.consumer = Consumer( + {'bootstrap.servers': 'linkafka01.psi.ch:9092', + 'group.id': uuid4()}) + self.consumer.subscribe([AMOR_EVENTS, AMOR_NICOS]) + self.geometry = AmorGeometry(1.0, 2.0, 0., 0., 1.5, 10.0, 4.0, 10.0) + self.timing = AmorTiming(0., 0., 500., 0., 30./500.) + # create empty dataset + self.events = np.recarray(0, dtype=EVENT_TYPE) + super().__init__() + + def run(self): + while not self.stop_event.is_set(): + messages = self.consumer.consume(10, timeout=1) + for message in messages: + self.process_message(message) + + def process_message(self, message): + if message.error(): + logging.info(f" received Kafka message with error: {message.error()}") + return + schema = get_schema(message.value()) + if message.topic()==AMOR_EVENTS and schema=='ev44': + events:EventData = deserialise_ev44(message.value()) + self.add_events(events) + self.new_events.set() + logging.debug(f' new events {events}') + elif message.topic()==AMOR_NICOS and schema=='f144': + nicos_data:ExtractedLogData = deserialise_f144(message.value()) + if nicos_data.source_name in self.nicos_mapping.keys(): + logging.debug(f' {nicos_data.source_name} = {nicos_data.value}') + self.update_instrument(nicos_data) + + def add_events(self, events:EventData): + """ + Add new events to the Dataset. The object keeps raw events + and only copies the latest set to the self.data object, + this allows to run the event processing to be performed on a "clean" + evnet stream each time. + """ + if self.stop_counting.is_set(): + return + prev_size = self.events.shape[0] + new_events = events.pixel_id.shape[0] + self.events.resize(prev_size+new_events, refcheck=False) + self.events.pixelID[prev_size:] = events.pixel_id + self.events.mask[prev_size:] = 0 + self.events.tof[prev_size:] = events.time_of_flight + + nicos_mapping = { + 'mu': ('geometry', 'mu'), + 'nu': ('geometry', 'nu'), + 'kappa': ('geometry', 'kap'), + 'kappa_offset': ('geometry', 'kad'), + 'ch1_trigger_phase': ('timing', 'ch1TriggerPhase'), + 'ch2_trigger_phase': ('timing', 'ch2TriggerPhase'), + 'ch2_speed': ('timing', 'chopperSpeed'), + 'chopper_phase': ('timing', 'chopperPhase'), + } + + def update_instrument(self, nicos_data:ExtractedLogData): + if nicos_data.source_name in self.nicos_mapping: + attr, subattr = self.nicos_mapping[nicos_data.source_name] + setattr(getattr(self, attr), subattr, nicos_data.value) + if nicos_data.source_name=='ch2_speed': + self.timing.tau = 30./self.timing.chopperSpeed + + def monitor(self): + return time()-self.start_time + + def restart(self): + # empty event buffer + self.events = np.recarray(0, dtype=EVENT_TYPE) + self.stop_counting.clear() + self.last_read = 0 + self.start_time = time() + self.new_events.clear() + + def get_events(self, total_counts=False): + packets = np.recarray(0, dtype=PACKET_TYPE) + pulses = np.recarray(0, dtype=PULSE_TYPE) + pc = np.recarray(0, dtype=PC_TYPE) + if total_counts: + last_read = 0 + else: + last_read = self.last_read + if last_read>=self.events.shape[0]: + raise EOFError("No new events arrived") + data = AmorEventStream(self.events[last_read:].copy(), packets, pulses, pc) + self.last_read = self.events.shape[0] + self.new_events.clear() + t_now = time() + monitor = t_now-self.last_read_time + self.last_read_time = t_now + return KafkaFrozenData(self.geometry, self.timing, data, monitor=monitor) diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index d76a5a6..a8c7c2c 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -29,13 +29,14 @@ hist = { """ import logging from typing import List, Tuple, Union +from threading import Thread, Event import numpy as np import json from time import time from dataclasses import dataclass, asdict from streaming_data_types import histogram_hs01 -from confluent_kafka import Producer, Consumer, TopicPartition +from confluent_kafka import Producer, Consumer from uuid import uuid4 @@ -134,6 +135,8 @@ class ESSSerializer: self._active_histogram_tofz = None self._last_message_yz = None self._last_message_tofz = None + self.new_count_started = Event() + self.count_stopped = Event() self.consumer.subscribe([KAFKA_TOPICS['command']]) @@ -147,7 +150,7 @@ class ESSSerializer: except Exception: logging.error(f'Could not interpret message: \n{command}', exc_info=True) return - logging.warning(command) + logging.info(command) resp = json.dumps({ "msg_id": getattr(command, "id", None) or command.msg_id, "response": "ACK", @@ -170,6 +173,7 @@ class ESSSerializer: "num events": message.data.sum() }) self._last_message_yz = None + self.count_stopped.set() elif command.hist_id == self._active_histogram_tofz: suffix = 'TofZ' self._active_histogram_tofz = None @@ -192,9 +196,12 @@ class ESSSerializer: if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': self._active_histogram_yz = hist.id logging.debug(f" histogram data_topic: {hist.data_topics}") + self._start = command.start + self.count_stopped.clear() + self.new_count_started.set() + self.set_empty_messages() if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': self._active_histogram_tofz = hist.id - self._start = command.start def receive(self, timeout=5): rec = self.consumer.poll(timeout) @@ -205,20 +212,19 @@ class ESSSerializer: return False def receive_loop(self): - while self._keep_receiving: + while not self._stop_receiving.is_set(): try: self.receive() except Exception: logging.error("Exception while receiving", exc_info=True) def start_command_thread(self): - from threading import Thread - self._keep_receiving = True + self._stop_receiving = Event() self._command_thread = Thread(target=self.receive_loop) self._command_thread.start() def end_command_thread(self, event=None): - self._keep_receiving = False + self._stop_receiving.set() self._command_thread.join() def acked(self, err, msg): @@ -261,6 +267,7 @@ class ESSSerializer: }) ) self._last_message_yz = message + logging.info(f" Sending {proj.data.cts.sum()} events to Nicos") elif isinstance(proj, TofZProjection): if self._active_histogram_tofz is None: return @@ -300,3 +307,59 @@ class ESSSerializer: topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() + + def set_empty_messages(self): + self._last_message_yz = HistogramMessage( + source='amor-eos', + timestamp=ktime(), + current_shape=(64, 448), + dim_metadata=( + DimMetadata( + length=64, + unit="pixel", + label="Y", + bin_boundaries=np.arange(64), + ), + DimMetadata( + length=448, + unit="pixel", + label="Z", + bin_boundaries=np.arange(448), + ) + ), + last_metadata_timestamp=0, + data=np.zeros((64, 448)), + errors=np.zeros((64, 448)), + info=json.dumps({ + "start": self._start, + "state": 'COUNTING', + "num events": 0 + }) + ) + self._last_message_tofz = message = HistogramMessage( + source='amor-eos', + timestamp=ktime(), + current_shape=(56, 448), + dim_metadata=( + DimMetadata( + length=56, + unit="ms", + label="ToF", + bin_boundaries=np.arange(56), + ), + DimMetadata( + length=448, + unit="pixel", + label="Z", + bin_boundaries=np.arange(448), + ), + ), + last_metadata_timestamp=0, + data=np.zeros((56, 448)), + errors=np.zeros((56, 448)), + info=json.dumps({ + "start": self._start, + "state": 'COUNTING', + "num events": 0 + }) + ) diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py index 3fd035e..34589c3 100644 --- a/eos/reduction_kafka.py +++ b/eos/reduction_kafka.py @@ -7,11 +7,10 @@ import logging import os from time import sleep -from .file_reader import AmorEventData, AmorHeader +from .kafka_events import KafkaEventData from .header import Header from .options import E2HConfig from . import event_handling as eh, event_analysis as ea -from .path_handling import PathResolver from .projection import TofZProjection, YZProjection from .kafka_serializer import ESSSerializer @@ -29,6 +28,8 @@ class KafkaReduction: self.config = config self.header = Header() + self.event_data = KafkaEventData() + self.event_data.start() self.prepare_actions() @@ -36,9 +37,6 @@ class KafkaReduction: """ Does not do any actual reduction. """ - self.path_resolver = PathResolver(self.config.reader.year, self.config.reader.rawPath) - self.current_file = self.path_resolver.resolve('0')[0] - # Actions on datasets not used for normalization self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset) self.event_actions |= eh.CorrectChopperPhase() @@ -56,70 +54,54 @@ class KafkaReduction: self.loop() def create_projections(self): - file_header = AmorHeader(self.current_file) self.proj_yz = YZProjection() - self.proj_tofz = TofZProjection(file_header.timing.tau, foldback=True, combine=2) - + self.proj_tofz = TofZProjection(self.event_data.timing.tau, foldback=True, combine=2) def read_data(self): - self.dataset = AmorEventData(self.current_file, max_events=self.config.reduction.max_events) + # make sure the first events have arrived before starting analysis + self.event_data.new_events.wait() + self.dataset = self.event_data.get_events() self.event_actions(self.dataset) + def add_data(self): - self.monitor = self.dataset.data.pulses.monitor.sum() + self.monitor = self.dataset.monitor self.proj_yz.project(self.dataset, monitor=self.monitor) self.proj_tofz.project(self.dataset, monitor=self.monitor) - def replace_dataset(self, latest): - new_file = self.path_resolver.resolve('0')[0] - if not os.path.exists(new_file): - return - try: - # check that events exist in the new file - AmorEventData(new_file, 0, max_events=1000) - except Exception: - logging.debug("Problem when trying to load new dataset", exc_info=True) - return - - logging.warning(f"Preceding to next file {latest}") - self.current_file = new_file - self.create_projections() # need to recreate projections, in case tau changed - self.read_data() - self.add_data() - def loop(self): + self.wait_for = self.serializer.new_count_started while True: try: self.update() - sleep(1.0) + self.wait_for.wait(1.0) except KeyboardInterrupt: + self.event_data.stop_event.set() + self.event_data.join() self.serializer.end_command_thread() return def update(self): - logging.debug(" check for update") - if self.config.reduction.fileIdentifier=='0': - # if latest file was choosen, check if new one available and switch to it - current = int(os.path.basename(self.current_file)[9:15]) - latest = self.path_resolver.search_latest(0) - if latest>current: - self.replace_dataset(latest) - return - # if all events were read last time, only load more if file was modified - if self.dataset.EOF and os.path.getmtime(self.current_file)<=self._last_mtime: - return - - self._last_mtime = os.path.getmtime(self.current_file) + if self.serializer.new_count_started.is_set(): + logging.warning('Start new count, clearing event data') + self.wait_for = self.serializer.count_stopped + self.event_data.restart() + self.serializer.new_count_started.clear() + self.proj_yz.clear() + self.proj_tofz.clear() + elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set(): + logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}') + self.wait_for = self.serializer.new_count_started + self.event_data.stop_counting.set() try: - update_data = AmorEventData(self.current_file, self.dataset.last_index+1, - max_events=self.config.reduction.max_events) + update_data = self.event_data.get_events() except EOFError: return logging.info(" updating with new data") self.event_actions(update_data) self.dataset=update_data - self.monitor = self.dataset.data.pulses.monitor.sum() + self.monitor = self.dataset.monitor self.proj_yz.project(update_data, self.monitor) self.proj_tofz.project(update_data, self.monitor) From b38eeb8eb3afe5cba95b6fca8f88623f5d702225 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Thu, 23 Oct 2025 17:22:48 +0200 Subject: [PATCH 08/10] Fix kafka event ToV value and send actual final counts after stop signal --- eos/kafka_events.py | 2 +- eos/kafka_serializer.py | 98 ++++------------------------------------- eos/reduction_kafka.py | 26 +++++++++-- 3 files changed, 32 insertions(+), 94 deletions(-) diff --git a/eos/kafka_events.py b/eos/kafka_events.py index 824deb8..7f638e9 100644 --- a/eos/kafka_events.py +++ b/eos/kafka_events.py @@ -115,7 +115,7 @@ class KafkaEventData(Thread): self.events.resize(prev_size+new_events, refcheck=False) self.events.pixelID[prev_size:] = events.pixel_id self.events.mask[prev_size:] = 0 - self.events.tof[prev_size:] = events.time_of_flight + self.events.tof[prev_size:] = events.time_of_flight/1.e9 nicos_mapping = { 'mu': ('geometry', 'mu'), diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index a8c7c2c..a1bf32b 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -133,8 +133,6 @@ class ESSSerializer: }) self._active_histogram_yz = None self._active_histogram_tofz = None - self._last_message_yz = None - self._last_message_tofz = None self.new_count_started = Event() self.count_stopped = Event() @@ -163,34 +161,9 @@ class ESSSerializer: self.producer.flush() if isinstance(command, Stop): if command.hist_id == self._active_histogram_yz: - suffix = 'YZ' - self._active_histogram_yz = None - message = self._last_message_yz - message.timestamp = ktime() - message.info=json.dumps({ - "start": self._start, - "state": 'FINISHED', - "num events": message.data.sum() - }) - self._last_message_yz = None self.count_stopped.set() - elif command.hist_id == self._active_histogram_tofz: - suffix = 'TofZ' - self._active_histogram_tofz = None - message = self._last_message_tofz - message.timestamp = ktime() - message.info=json.dumps({ - "start": self._start, - "state": 'FINISHED', - "num events": message.data.sum() - }) - self._last_message_tofz = None else: return - self.producer.produce(value=message.serialize(), - topic=KAFKA_TOPICS['histogram']+'_'+suffix, - callback=self.acked) - self.producer.flush() elif isinstance(command, ConfigureHistogram): for hist in command.histograms: if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': @@ -199,7 +172,6 @@ class ESSSerializer: self._start = command.start self.count_stopped.clear() self.new_count_started.set() - self.set_empty_messages() if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': self._active_histogram_tofz = hist.id @@ -234,7 +206,11 @@ class ESSSerializer: else: logging.debug("Message produced: %s" % (str(msg))) - def send(self, proj: Union[YZProjection, TofZProjection]): + def send(self, proj: Union[YZProjection, TofZProjection], final=False): + if final: + state = 'FINISHED' + else: + state = 'COUNTING' if isinstance(proj, YZProjection): if self._active_histogram_yz is None: return @@ -262,12 +238,11 @@ class ESSSerializer: errors=np.sqrt(proj.data.cts), info=json.dumps({ "start": self._start, - "state": 'COUNTING', + "state": state, "num events": proj.data.cts.sum() }) ) - self._last_message_yz = message - logging.info(f" Sending {proj.data.cts.sum()} events to Nicos") + logging.info(f" {state}: Sending {proj.data.cts.sum()} events to Nicos") elif isinstance(proj, TofZProjection): if self._active_histogram_tofz is None: return @@ -295,11 +270,10 @@ class ESSSerializer: errors=np.sqrt(proj.data.cts), info=json.dumps({ "start": self._start, - "state": 'COUNTING', + "state": state, "num events": proj.data.I.sum() }) ) - self._last_message_tofz = message else: raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") @@ -307,59 +281,3 @@ class ESSSerializer: topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() - - def set_empty_messages(self): - self._last_message_yz = HistogramMessage( - source='amor-eos', - timestamp=ktime(), - current_shape=(64, 448), - dim_metadata=( - DimMetadata( - length=64, - unit="pixel", - label="Y", - bin_boundaries=np.arange(64), - ), - DimMetadata( - length=448, - unit="pixel", - label="Z", - bin_boundaries=np.arange(448), - ) - ), - last_metadata_timestamp=0, - data=np.zeros((64, 448)), - errors=np.zeros((64, 448)), - info=json.dumps({ - "start": self._start, - "state": 'COUNTING', - "num events": 0 - }) - ) - self._last_message_tofz = message = HistogramMessage( - source='amor-eos', - timestamp=ktime(), - current_shape=(56, 448), - dim_metadata=( - DimMetadata( - length=56, - unit="ms", - label="ToF", - bin_boundaries=np.arange(56), - ), - DimMetadata( - length=448, - unit="pixel", - label="Z", - bin_boundaries=np.arange(448), - ), - ), - last_metadata_timestamp=0, - data=np.zeros((56, 448)), - errors=np.zeros((56, 448)), - info=json.dumps({ - "start": self._start, - "state": 'COUNTING', - "num events": 0 - }) - ) diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py index 34589c3..6656096 100644 --- a/eos/reduction_kafka.py +++ b/eos/reduction_kafka.py @@ -89,10 +89,9 @@ class KafkaReduction: self.serializer.new_count_started.clear() self.proj_yz.clear() self.proj_tofz.clear() + return elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set(): - logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}') - self.wait_for = self.serializer.new_count_started - self.event_data.stop_counting.set() + return self.finish_count() try: update_data = self.event_data.get_events() except EOFError: @@ -107,3 +106,24 @@ class KafkaReduction: self.serializer.send(self.proj_yz) self.serializer.send(self.proj_tofz) + + def finish_count(self): + logging.debug(" stop event set, hold event collection and send final results") + self.wait_for = self.serializer.new_count_started + self.event_data.stop_counting.set() + + try: + update_data = self.event_data.get_events() + except EOFError: + pass + else: + self.event_actions(update_data) + self.dataset = update_data + self.monitor = self.dataset.monitor + self.proj_yz.project(update_data, self.monitor) + self.proj_tofz.project(update_data, self.monitor) + + logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}') + + self.serializer.send(self.proj_yz, final=True) + self.serializer.send(self.proj_tofz, final=True) From 4cd25dec3da842e3199f8cd91e4702ddbad37185 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Mon, 27 Oct 2025 08:08:52 +0100 Subject: [PATCH 09/10] Recreate projection on counting start accounting for possible changes in tau --- eos/reduction_kafka.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py index 6656096..90ca054 100644 --- a/eos/reduction_kafka.py +++ b/eos/reduction_kafka.py @@ -87,8 +87,7 @@ class KafkaReduction: self.wait_for = self.serializer.count_stopped self.event_data.restart() self.serializer.new_count_started.clear() - self.proj_yz.clear() - self.proj_tofz.clear() + self.create_projections() return elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set(): return self.finish_count() From 003a8c04cefd342278e215a0e08731b48416c315 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Mon, 27 Oct 2025 08:12:58 +0100 Subject: [PATCH 10/10] Nicos service just to log info by default --- eos/nicos.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/eos/nicos.py b/eos/nicos.py index 147d935..6d6f892 100644 --- a/eos/nicos.py +++ b/eos/nicos.py @@ -20,6 +20,10 @@ def main(): clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig], 'amor-nicos') update_loglevel(clas.verbose) + if clas.verbose<2: + # only log info level in logfile + logger = logging.getLogger() # logging.getLogger('quicknxs') + logger.setLevel(logging.INFO) reader_config = ReaderConfig.from_args(clas) experiment_config = ExperimentConfig.from_args(clas)