From 0b7a56628ff859b35f2473b270a5bb02efe7597e Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Tue, 14 Oct 2025 14:48:49 +0200 Subject: [PATCH] Transfer to NICOS working for YZ and TofZ projections --- eos/kafka_serializer.py | 174 +++++++++++++++++++++++++++++++--------- eos/reduction_e2h.py | 5 +- 2 files changed, 141 insertions(+), 38 deletions(-) diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index 8e165be..42ef8b0 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -27,7 +27,8 @@ hist = { "info": "info_string", } """ -from typing import Tuple, Union +import logging +from typing import List, Tuple, Union import numpy as np import json @@ -38,7 +39,17 @@ from confluent_kafka import Producer, Consumer, TopicPartition from uuid import uuid4 -from .projection import LZProjection, YZProjection +from .projection import TofZProjection, YZProjection + +KAFKA_BROKER = 'linkafka01.psi.ch:9092' +KAFKA_TOPICS = { + 'histogram': 'AMOR_histograms', + 'response': 'AMOR_histResponse', + 'command': 'AMOR_histCommands' + } + +def ktime(): + return int(time()*1_000) @dataclass class DimMetadata: @@ -61,60 +72,148 @@ class HistogramMessage: def serialize(self): return histogram_hs00.serialise_hs00(asdict(self)) +@dataclass +class CommandMessage: + msg_id: str + + cmd=None + + @classmethod + def get_message(cls, data): + """ + Uses the sub-class cmd attribute to select which message to retugn + """ + msg = dict([(ci.cmd, ci) for ci in cls.__subclasses__()]) + return msg[data['cmd']](**data) + + +@dataclass +class Stop(CommandMessage): + hist_id: str + id: str + cmd:str = 'stop' + +@dataclass +class HistogramConfig: + id: str + type: str + data_brokers: List[str] + topic: str + data_topics: List[str] + tof_range: Tuple[float, float] + det_range: Tuple[int, int] + num_bins: int + width: int + height: int + left_edges: list + source: str + +@dataclass +class ConfigureHistogram(CommandMessage): + histograms: List[HistogramConfig] + start: int + cmd:str = 'config' + + def __post_init__(self): + self.histograms = [HistogramConfig(**cfg) for cfg in self.histograms] + + class ESSSerializer: def __init__(self): self.producer = Producer({ - 'bootstrap.servers': 'linkafka01.psi.ch:9092', + 'bootstrap.servers': KAFKA_BROKER, + 'message.max.bytes': 4_000_000, }) self.consumer = Consumer({ - 'bootstrap.servers': 'linkafka01.psi.ch:9092', + 'bootstrap.servers': KAFKA_BROKER, "group.id": uuid4(), "default.topic.config": {"auto.offset.reset": "latest"}, }) + self._active_histogram = None + self._last_message = None - #tp = [TopicPartition( "SANSLLB_histCommands",0)] - #self.consumer.assign(tp) - self.consumer.subscribe(["SANSLLB_histCommands"]) + self.consumer.subscribe([KAFKA_TOPICS['command']]) def process_message(self, message): if message.error(): - print("Command Consumer Error: %s", message.error()) + logging.error("Command Consumer Error: %s", message.error()) else: command = json.loads(message.value().decode()) - print(command) + try: + command = CommandMessage.get_message(command) + except Exception: + logging.error(f'Could not interpret message: \n{command}', exc_info=True) + return + logging.warning(command) resp = json.dumps({ - "msg_id": command.get("id") or command.get("msg_id"), + "msg_id": getattr(command, "id", None) or command.msg_id, "response": "ACK", "message": "" }) self.producer.produce( - topic="SANSLLB_histResponse", + topic=KAFKA_TOPICS['response'], value=resp ) self.producer.flush() + if isinstance(command, Stop) and command.hist_id == self._active_histogram: + self._active_histogram = None + message = self._last_message + message.timestamp = ktime() + message.info=json.dumps({ + "start": self._start, + "state": 'FINISHED', + "num events": message.data.sum() + }) + self._last_message = None + self.producer.produce(value=message.serialize(), + topic=KAFKA_TOPICS['histogram'], + callback=self.acked) + self.producer.flush() + elif isinstance(command, ConfigureHistogram): + self._active_histogram = command.histograms[0].id + self._start = command.start def receive(self, timeout=5): - rec = self.consumer.poll(5) + rec = self.consumer.poll(timeout) if rec is not None: self.process_message(rec) return True else: return False + def receive_loop(self): + while self._keep_receiving: + try: + self.receive() + except Exception: + logging.error("Exception while receiving", exc_info=True) + + def start_command_thread(self): + from threading import Thread + self._keep_receiving = True + self._command_thread = Thread(target=self.receive_loop) + self._command_thread.start() + + def end_command_thread(self, event=None): + self._keep_receiving = False + self._command_thread.join() def acked(self, err, msg): # We need to have callback to produce-method to catch server errors if err is not None: - print("Failed to deliver message: %s: %s" % (str(msg), str(err))) + logging.warning("Failed to deliver message: %s: %s" % (str(msg), str(err))) else: - print("Message produced: %s" % (str(msg))) + logging.debug("Message produced: %s" % (str(msg))) - def send(self, proj: Union[YZProjection, LZProjection]): + def send(self, proj: Union[YZProjection, TofZProjection]): + if self._active_histogram is None: + proj.clear() + return if isinstance(proj, YZProjection): message = HistogramMessage( - source='just-bin-it', - timestamp=int(time()), + source='amor-eos', + timestamp=ktime(), current_shape=(proj.y.shape[0]-1, proj.z.shape[0]-1), dim_metadata=( DimMetadata( @@ -131,38 +230,38 @@ class ESSSerializer: ) ), last_metadata_timestamp=0, - data=proj.data.I, - errors=proj.data.err, + data=proj.data.cts, + errors=np.sqrt(proj.data.cts), info=json.dumps({ - "start": int(time()), + "start": self._start, "state": 'COUNTING', "num events": proj.data.cts.sum() }) ) - elif isinstance(proj, LZProjection): + elif isinstance(proj, TofZProjection): message = HistogramMessage( - source='just-bin-it', - timestamp=int(time()), - current_shape=(proj.lamda.shape[0]-1, proj.alphaF.shape[0]-1), + source='amor-eos', + timestamp=ktime(), + current_shape=(proj.tof.shape[0]-1, proj.z.shape[0]-1), dim_metadata=( DimMetadata( - length=proj.lamda.shape[0]-1, - unit="Angstrom", - label="Lambda", - bin_boundaries=proj.lamda, + length=proj.tof.shape[0]-1, + unit="ms", + label="ToF", + bin_boundaries=proj.tof, ), DimMetadata( - length=proj.alphaF.shape[0]-1, - unit="degrees", - label="Theta", - bin_boundaries=proj.alphaF, - ) + length=proj.z.shape[0]-1, + unit="pixel", + label="Z", + bin_boundaries=proj.z, + ), ), last_metadata_timestamp=0, - data=proj.data.ref, - errors=proj.data.err, + data=proj.data.cts, + errors=np.sqrt(proj.data.cts), info=json.dumps({ - "start": int(time()), + "start": self._start, "state": 'COUNTING', "num events": proj.data.I.sum() }) @@ -170,7 +269,8 @@ class ESSSerializer: else: raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") + self._last_message = message self.producer.produce(value=message.serialize(), - topic='SANSLLB_histograms', + topic=KAFKA_TOPICS['histogram'], callback=self.acked) self.producer.flush() diff --git a/eos/reduction_e2h.py b/eos/reduction_e2h.py index 9269d3d..63a329f 100644 --- a/eos/reduction_e2h.py +++ b/eos/reduction_e2h.py @@ -93,7 +93,8 @@ class E2HReduction: # plot dependant options if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.LT, E2HPlotSelection.Q]: - self.grid = LZGrid(0.01, [0.0, 0.25]) + self.grid = LZGrid(0.05, [0.0, 0.25]) + self.grid.dldl = 0.05 if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.Raw, E2HPlotSelection.LT, E2HPlotSelection.YT, @@ -127,6 +128,8 @@ class E2HReduction: if self.config.reduction.kafka: from .kafka_serializer import ESSSerializer self.serializer = ESSSerializer() + self.fig.canvas.mpl_connect('close_event', self.serializer.end_command_thread) + self.serializer.start_command_thread() self.serializer.send(self.projection) if self.config.reduction.update: self.timer = self.fig.canvas.new_timer(1000)