From 6f1a4721288c10ec0be38dd3bfde335cc4c45d0a Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 15 Apr 2025 20:17:47 +0200 Subject: [PATCH] fix(on_async_readback): suppress history updates from old scan_id --- bec_widgets/utils/bec_dispatcher.py | 9 ++++----- bec_widgets/widgets/plots/waveform/waveform.py | 6 ++++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index 976b70b0..96fdf894 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -25,9 +25,9 @@ if TYPE_CHECKING: # pragma: no cover class QtThreadSafeCallback(QObject): cb_signal = pyqtSignal(dict, dict) - def __init__(self, cb): + def __init__(self, cb, cb_info: dict | None = None): super().__init__() - self.topics = None + self.cb_info = cb_info self.cb = cb self.cb_signal.connect(self.cb) @@ -38,7 +38,6 @@ class QtThreadSafeCallback(QObject): return id(self.cb) def __call__(self, msg_content, metadata): - logger.info(f"Received message for topic {self.topics}") self.cb_signal.emit(msg_content, metadata) @@ -138,6 +137,7 @@ class BECDispatcher: self, slot: Callable, topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]], + cb_info: dict | None = None, **kwargs, ) -> None: """Connect widget's qt slot, so that it is called on new pub/sub topic message. @@ -147,8 +147,7 @@ class BECDispatcher: the corresponding pub/sub message topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints """ - slot = QtThreadSafeCallback(slot) - slot.topics = topics + slot = QtThreadSafeCallback(slot, cb_info=cb_info) self.client.connector.register(topics, cb=slot, **kwargs) topics_str, _ = self.client.connector._convert_endpointinfo(topics) self._slots[slot].update(set(topics_str)) diff --git a/bec_widgets/widgets/plots/waveform/waveform.py b/bec_widgets/widgets/plots/waveform/waveform.py index 3a7fcfcb..d38fb049 100644 --- a/bec_widgets/widgets/plots/waveform/waveform.py +++ b/bec_widgets/widgets/plots/waveform/waveform.py @@ -1176,6 +1176,7 @@ class Waveform(PlotBase): self.bec_dispatcher.connect_slot( self.on_async_readback, MessageEndpoints.device_async_readback(self.scan_id, name), + cb_info={"scan_id": self.scan_id}, from_start=True, ) logger.info(f"Setup async curve {name}") @@ -1199,6 +1200,11 @@ class Waveform(PlotBase): msg(dict): Message with the async data. metadata(dict): Metadata of the message. """ + sender = self.sender() + if sender and hasattr(sender, "cb_info"): + scan_id = sender.cb_info.get("scan_id", None) + if scan_id != self.scan_id: + return # Ignore messages from other scans instruction = metadata.get("async_update", {}).get("type") if instruction not in ["add", "add_slice", "replace"]: logger.warning(f"Invalid async update instruction: {instruction}")