mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
fix(on_async_readback): suppress history updates from old scan_id
This commit is contained in:
@ -25,9 +25,9 @@ if TYPE_CHECKING: # pragma: no cover
|
|||||||
class QtThreadSafeCallback(QObject):
|
class QtThreadSafeCallback(QObject):
|
||||||
cb_signal = pyqtSignal(dict, dict)
|
cb_signal = pyqtSignal(dict, dict)
|
||||||
|
|
||||||
def __init__(self, cb):
|
def __init__(self, cb, cb_info: dict | None = None):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.topics = None
|
self.cb_info = cb_info
|
||||||
self.cb = cb
|
self.cb = cb
|
||||||
self.cb_signal.connect(self.cb)
|
self.cb_signal.connect(self.cb)
|
||||||
|
|
||||||
@ -38,7 +38,6 @@ class QtThreadSafeCallback(QObject):
|
|||||||
return id(self.cb)
|
return id(self.cb)
|
||||||
|
|
||||||
def __call__(self, msg_content, metadata):
|
def __call__(self, msg_content, metadata):
|
||||||
logger.info(f"Received message for topic {self.topics}")
|
|
||||||
self.cb_signal.emit(msg_content, metadata)
|
self.cb_signal.emit(msg_content, metadata)
|
||||||
|
|
||||||
|
|
||||||
@ -138,6 +137,7 @@ class BECDispatcher:
|
|||||||
self,
|
self,
|
||||||
slot: Callable,
|
slot: Callable,
|
||||||
topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]],
|
topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]],
|
||||||
|
cb_info: dict | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Connect widget's qt slot, so that it is called on new pub/sub topic message.
|
"""Connect widget's qt slot, so that it is called on new pub/sub topic message.
|
||||||
@ -147,8 +147,7 @@ class BECDispatcher:
|
|||||||
the corresponding pub/sub message
|
the corresponding pub/sub message
|
||||||
topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints
|
topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints
|
||||||
"""
|
"""
|
||||||
slot = QtThreadSafeCallback(slot)
|
slot = QtThreadSafeCallback(slot, cb_info=cb_info)
|
||||||
slot.topics = topics
|
|
||||||
self.client.connector.register(topics, cb=slot, **kwargs)
|
self.client.connector.register(topics, cb=slot, **kwargs)
|
||||||
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
|
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
|
||||||
self._slots[slot].update(set(topics_str))
|
self._slots[slot].update(set(topics_str))
|
||||||
|
@ -1176,6 +1176,7 @@ class Waveform(PlotBase):
|
|||||||
self.bec_dispatcher.connect_slot(
|
self.bec_dispatcher.connect_slot(
|
||||||
self.on_async_readback,
|
self.on_async_readback,
|
||||||
MessageEndpoints.device_async_readback(self.scan_id, name),
|
MessageEndpoints.device_async_readback(self.scan_id, name),
|
||||||
|
cb_info={"scan_id": self.scan_id},
|
||||||
from_start=True,
|
from_start=True,
|
||||||
)
|
)
|
||||||
logger.info(f"Setup async curve {name}")
|
logger.info(f"Setup async curve {name}")
|
||||||
@ -1199,6 +1200,11 @@ class Waveform(PlotBase):
|
|||||||
msg(dict): Message with the async data.
|
msg(dict): Message with the async data.
|
||||||
metadata(dict): Metadata of the message.
|
metadata(dict): Metadata of the message.
|
||||||
"""
|
"""
|
||||||
|
sender = self.sender()
|
||||||
|
if sender and hasattr(sender, "cb_info"):
|
||||||
|
scan_id = sender.cb_info.get("scan_id", None)
|
||||||
|
if scan_id != self.scan_id:
|
||||||
|
return # Ignore messages from other scans
|
||||||
instruction = metadata.get("async_update", {}).get("type")
|
instruction = metadata.get("async_update", {}).get("type")
|
||||||
if instruction not in ["add", "add_slice", "replace"]:
|
if instruction not in ["add", "add_slice", "replace"]:
|
||||||
logger.warning(f"Invalid async update instruction: {instruction}")
|
logger.warning(f"Invalid async update instruction: {instruction}")
|
||||||
|
Reference in New Issue
Block a user