From b38eeb8eb3afe5cba95b6fca8f88623f5d702225 Mon Sep 17 00:00:00 2001 From: Artur Glavic Date: Thu, 23 Oct 2025 17:22:48 +0200 Subject: [PATCH] Fix kafka event ToV value and send actual final counts after stop signal --- eos/kafka_events.py | 2 +- eos/kafka_serializer.py | 98 ++++------------------------------------- eos/reduction_kafka.py | 26 +++++++++-- 3 files changed, 32 insertions(+), 94 deletions(-) diff --git a/eos/kafka_events.py b/eos/kafka_events.py index 824deb8..7f638e9 100644 --- a/eos/kafka_events.py +++ b/eos/kafka_events.py @@ -115,7 +115,7 @@ class KafkaEventData(Thread): self.events.resize(prev_size+new_events, refcheck=False) self.events.pixelID[prev_size:] = events.pixel_id self.events.mask[prev_size:] = 0 - self.events.tof[prev_size:] = events.time_of_flight + self.events.tof[prev_size:] = events.time_of_flight/1.e9 nicos_mapping = { 'mu': ('geometry', 'mu'), diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index a8c7c2c..a1bf32b 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -133,8 +133,6 @@ class ESSSerializer: }) self._active_histogram_yz = None self._active_histogram_tofz = None - self._last_message_yz = None - self._last_message_tofz = None self.new_count_started = Event() self.count_stopped = Event() @@ -163,34 +161,9 @@ class ESSSerializer: self.producer.flush() if isinstance(command, Stop): if command.hist_id == self._active_histogram_yz: - suffix = 'YZ' - self._active_histogram_yz = None - message = self._last_message_yz - message.timestamp = ktime() - message.info=json.dumps({ - "start": self._start, - "state": 'FINISHED', - "num events": message.data.sum() - }) - self._last_message_yz = None self.count_stopped.set() - elif command.hist_id == self._active_histogram_tofz: - suffix = 'TofZ' - self._active_histogram_tofz = None - message = self._last_message_tofz - message.timestamp = ktime() - message.info=json.dumps({ - "start": self._start, - "state": 'FINISHED', - "num events": message.data.sum() - }) - self._last_message_tofz = None else: return - self.producer.produce(value=message.serialize(), - topic=KAFKA_TOPICS['histogram']+'_'+suffix, - callback=self.acked) - self.producer.flush() elif isinstance(command, ConfigureHistogram): for hist in command.histograms: if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': @@ -199,7 +172,6 @@ class ESSSerializer: self._start = command.start self.count_stopped.clear() self.new_count_started.set() - self.set_empty_messages() if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': self._active_histogram_tofz = hist.id @@ -234,7 +206,11 @@ class ESSSerializer: else: logging.debug("Message produced: %s" % (str(msg))) - def send(self, proj: Union[YZProjection, TofZProjection]): + def send(self, proj: Union[YZProjection, TofZProjection], final=False): + if final: + state = 'FINISHED' + else: + state = 'COUNTING' if isinstance(proj, YZProjection): if self._active_histogram_yz is None: return @@ -262,12 +238,11 @@ class ESSSerializer: errors=np.sqrt(proj.data.cts), info=json.dumps({ "start": self._start, - "state": 'COUNTING', + "state": state, "num events": proj.data.cts.sum() }) ) - self._last_message_yz = message - logging.info(f" Sending {proj.data.cts.sum()} events to Nicos") + logging.info(f" {state}: Sending {proj.data.cts.sum()} events to Nicos") elif isinstance(proj, TofZProjection): if self._active_histogram_tofz is None: return @@ -295,11 +270,10 @@ class ESSSerializer: errors=np.sqrt(proj.data.cts), info=json.dumps({ "start": self._start, - "state": 'COUNTING', + "state": state, "num events": proj.data.I.sum() }) ) - self._last_message_tofz = message else: raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") @@ -307,59 +281,3 @@ class ESSSerializer: topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() - - def set_empty_messages(self): - self._last_message_yz = HistogramMessage( - source='amor-eos', - timestamp=ktime(), - current_shape=(64, 448), - dim_metadata=( - DimMetadata( - length=64, - unit="pixel", - label="Y", - bin_boundaries=np.arange(64), - ), - DimMetadata( - length=448, - unit="pixel", - label="Z", - bin_boundaries=np.arange(448), - ) - ), - last_metadata_timestamp=0, - data=np.zeros((64, 448)), - errors=np.zeros((64, 448)), - info=json.dumps({ - "start": self._start, - "state": 'COUNTING', - "num events": 0 - }) - ) - self._last_message_tofz = message = HistogramMessage( - source='amor-eos', - timestamp=ktime(), - current_shape=(56, 448), - dim_metadata=( - DimMetadata( - length=56, - unit="ms", - label="ToF", - bin_boundaries=np.arange(56), - ), - DimMetadata( - length=448, - unit="pixel", - label="Z", - bin_boundaries=np.arange(448), - ), - ), - last_metadata_timestamp=0, - data=np.zeros((56, 448)), - errors=np.zeros((56, 448)), - info=json.dumps({ - "start": self._start, - "state": 'COUNTING', - "num events": 0 - }) - ) diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py index 34589c3..6656096 100644 --- a/eos/reduction_kafka.py +++ b/eos/reduction_kafka.py @@ -89,10 +89,9 @@ class KafkaReduction: self.serializer.new_count_started.clear() self.proj_yz.clear() self.proj_tofz.clear() + return elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set(): - logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}') - self.wait_for = self.serializer.new_count_started - self.event_data.stop_counting.set() + return self.finish_count() try: update_data = self.event_data.get_events() except EOFError: @@ -107,3 +106,24 @@ class KafkaReduction: self.serializer.send(self.proj_yz) self.serializer.send(self.proj_tofz) + + def finish_count(self): + logging.debug(" stop event set, hold event collection and send final results") + self.wait_for = self.serializer.new_count_started + self.event_data.stop_counting.set() + + try: + update_data = self.event_data.get_events() + except EOFError: + pass + else: + self.event_actions(update_data) + self.dataset = update_data + self.monitor = self.dataset.monitor + self.proj_yz.project(update_data, self.monitor) + self.proj_tofz.project(update_data, self.monitor) + + logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}') + + self.serializer.send(self.proj_yz, final=True) + self.serializer.send(self.proj_tofz, final=True)