From 6d15ee50b87f616bda82b102383d1130f1d6a959 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 15 Oct 2025 12:50:15 +0200 Subject: [PATCH] refactor(bec-signals): Refactor AsyncSignal to AsyncSignal and AsyncMultiSignal --- ophyd_devices/utils/bec_signals.py | 215 +++++++++++++++++++++++------ 1 file changed, 173 insertions(+), 42 deletions(-) diff --git a/ophyd_devices/utils/bec_signals.py b/ophyd_devices/utils/bec_signals.py index c41c15e..fd992f4 100644 --- a/ophyd_devices/utils/bec_signals.py +++ b/ophyd_devices/utils/bec_signals.py @@ -3,7 +3,7 @@ Module for custom BEC signals, that wrap around ophyd.Signal. These signals emit BECMessage objects, which comply with the BEC message system. """ -from time import time +import time from typing import Any, Callable, Literal, Type import numpy as np @@ -147,26 +147,33 @@ class BECMessageSignal(Signal): if isinstance(signals, Callable): signals = signals() if signals is None: - return [(self.attr_name or self.name, Kind.hinted.value)] + return [(self.name, Kind.hinted.value)] # Default to signal name with hinted kind if isinstance(signals, str): - return [(signals, Kind.hinted.value)] - if not isinstance(signals, list): - raise ValueError( - f"Signals must be a list of tuples or strings, got {type(signals).__name__}." - ) - out = [] - for signal in signals: - if isinstance(signal, str): - out.append((signal, Kind.normal.value)) - elif isinstance(signal, tuple) and len(signal) == 2: - if isinstance(signal[1], Kind): - out.append((signal[0], signal[1].value)) - else: - out.append((signal[0], signal[1])) - else: + out = [(signals, Kind.hinted.value)] + else: + if not isinstance(signals, list): raise ValueError( - f"Invalid signal format: {signal}. Expected a tuple of (name, kind) or a string." + f"Signals must be a list of tuples or strings, got {type(signals).__name__}." ) + out = [] + for signal in signals: + if isinstance(signal, str): + out.append((signal, Kind.normal.value)) + elif isinstance(signal, tuple) and len(signal) == 2: + if isinstance(signal[1], Kind): + out.append((signal[0], signal[1].value)) + else: + out.append((signal[0], signal[1])) + else: + raise ValueError( + f"Invalid signal format: {signal}. Expected a tuple of (name, kind) or a string." + ) + if len(out) == 1 and out[0][0] != self.name: + signal_name, signal_kind = out[0] + logger.warning( + f"Signal {self.name} of class {self.__class__.__name__} has only one sub-signal. Signal name {signal_name} will be renamed to {self.name}." + ) + out = [(self.name, signal_kind)] return out def describe(self): @@ -639,9 +646,9 @@ class DynamicSignal(BECMessageSignal): self, *, name: str, - signals: list[str] | Callable[[], list[str]] | None = None, + signals: list[str] | Callable[[], list[str]] | str | None = None, value: messages.DeviceMessage | dict | None = None, - async_update: dict[Literal["type", "max_size", "index"], Any] | None = None, + async_update: dict[Literal["type", "max_shape", "index"], Any] | None = None, acquisition_group: Literal["baseline", "monitored"] | str | None = None, **kwargs, ): @@ -654,13 +661,13 @@ class DynamicSignal(BECMessageSignal): value (DeviceMessage | dict | None): The initial value of the signal. Defaults to None. acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group. async_update (dict | None): Additional metadata for asynchronous updates. - There are three relevant keys "type", "max_size" and "index". + There are three relevant keys "type", "max_shape" and "index". "type" (str) : Can be one of "add", "add_slice" or "replace". This defines how the new data is added to the existing dataset. "add" : Appends data to the existing dataset. The data is always appended to the first axis. "add_slice" : Appends data to the existing dataset, but allows specifying a slice. The slice is defined by the "index" key. "replace" : Replaces the existing dataset with the new data. - "max_size" (list[int | None]): Required for type 'add' and 'add_slice'. It defines where the data is added. For a 1D dataset, + "max_shape" (list[int | None]): Required for type 'add' and 'add_slice'. It defines where the data is added. For a 1D dataset, it should be [None]. For a 1D dataset with 3000 elements, it should be [None, 3000]. For a 2D dataset with 3000x3000 elements, it should be [None, 3000, 3000]. "index" (int): Only required for type 'add_slice'. It defines the index where the data is added. @@ -688,7 +695,7 @@ class DynamicSignal(BECMessageSignal): value: messages.DeviceMessage | dict[str, dict[Literal["value", "timestamp"], Any]], *, metadata: dict | None = None, - async_update: dict[Literal["type", "max_size", "index"], Any] | None = None, + async_update: dict[Literal["type", "max_shape", "index"], Any] | None = None, acquisition_group: Literal["baseline", "monitored"] | str | None = None, **kwargs, ) -> None: @@ -702,7 +709,7 @@ class DynamicSignal(BECMessageSignal): Args: value (dict | DeviceMessage): The dynamic device data. metadata (dict | None): Additional metadata. - async_update (dict[Literal["type", "max_size", "index"], Any] | None): Additional metadata for asynchronous updates. + async_update (dict[Literal["type", "max_shape", "index"], Any] | None): Additional metadata for asynchronous updates. acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group. """ if isinstance(value, messages.DeviceMessage): @@ -716,15 +723,8 @@ class DynamicSignal(BECMessageSignal): metadata = metadata or {} if async_update is not None: metadata["async_update"] = async_update - else: + elif self.async_update is not None: metadata["async_update"] = self.async_update - if not metadata.get("async_update"): - raise ValueError( - f"Async update must be provided for signal {self.name} of class {self.__class__.__name__}." - ) - else: - pass - # TODO #627 Issue in BEC: Validate async_update --> bec_lib if acquisition_group is not None: metadata["acquisition_group"] = acquisition_group elif self.acquisition_group is not None: @@ -737,14 +737,14 @@ class DynamicSignal(BECMessageSignal): return super().put(msg, **kwargs) def _check_signals(self, msg: messages.DeviceMessage) -> None: - """Check if all signals are valid.""" + """Check if all signals are valid, and if relevant metadata is also present.""" if len(self.signals) == 1: if self.name not in msg.signals: raise ValueError( f"Signal {self.name} not found in message {list(msg.signals.keys())}" ) return - available_signals = [f"{self.name}_{name}" for name, _ in self.signals] + available_signals = [f"{self.name}_{signal_name}" for signal_name, _ in self.signals] if self.strict_signal_validation: if set(msg.signals.keys()) != set(available_signals): raise ValueError( @@ -757,13 +757,20 @@ class DynamicSignal(BECMessageSignal): raise ValueError( f"Invalid signal name in message {list(msg.signals.keys())} for signals {available_signals}" ) + # Check if async_update metadata is present + if "async_update" not in msg.metadata: + raise ValueError( + f"Async update must be provided for signal {self.name} of class {self.__class__.__name__}." + ) + # Add here validation for async update + # TODO #629 Issue in BEC: Validate async_update --> bec_lib def set( self, value: messages.DeviceMessage | dict[str, dict[Literal["value"], Any]], *, metadata: dict | None = None, - async_update: dict[Literal["type", "max_size", "index"], Any] | None = None, + async_update: dict[Literal["type", "max_shape", "index"], Any] | None = None, acquisition_group: Literal["baseline", "monitored"] | str | None = None, **kwargs, ) -> DeviceStatus: @@ -790,8 +797,8 @@ class DynamicSignal(BECMessageSignal): return status -class AsyncSignal(DynamicSignal): - """Signal to emit asynchronous data.""" +class AsyncMultiSignal(DynamicSignal): + """Async Signal group to emit asynchronous data from multiple signals.""" strict_signal_validation = True @@ -801,19 +808,33 @@ class AsyncSignal(DynamicSignal): name: str, ndim: Literal[0, 1, 2], max_size: int, + signals: list[str] | Callable[[], list[str]], value: messages.DeviceMessage | dict | None = None, - async_update: dict | None = None, + acquisition_group: Literal["baseline", "monitored"] | str | None = None, + async_update: dict[Literal["type", "max_shape", "index"], Any] | None = None, **kwargs, ): """ Create a new AsyncSignal object. Args: - name (str): The name of the signal. - ndim (Literal[0, 1, 2]): The number of dimensions of the signal(s). - max_size (int): The maximum size of the signal buffer. + name (str): The name of the signal group. + ndim (Literal[0, 1, 2]): The number of dimensions of the signals. + max_size (int): The maximum size of the signal buffer. For ndim=2, this should be kept small to avoid large memory usage. + signals (list[str] | Callable[[], list[str]]): The names of all sub-signals. Names will be prefixed with the group name. value (AsyncMessage | dict | None): The initial value of the signal. Defaults to None. - async_update (dict | None): Additional metadata for asynchronous updates. Defaults to None. + acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group. + async_update (dict | None): Additional metadata for asynchronous updates. + There are three relevant keys "type", "max_shape" and "index". + "type" (str) : Can be one of "add", "add_slice" or "replace". This defines how the new data is added to the existing dataset. + "add" : Appends data to the existing dataset. The data is always appended to the first axis. + "add_slice" : Appends data to the existing dataset, but allows specifying a slice. + The slice is defined by the "index" key. + "replace" : Replaces the existing dataset with the new data. + "max_shape" (list[int | None]): Required for type 'add' and 'add_slice'. It defines where the data is added. For a 1D dataset, + it should be [None]. For a 1D dataset with 3000 elements, it should be [None, 3000]. + For a 2D dataset with 3000x3000 elements, it should be [None, 3000, 3000]. + "index" (int): Only required for type 'add_slice'. It defines the index where the data is added. """ kwargs.pop("kind", None) # Ignore kind if specified super().__init__( @@ -827,5 +848,115 @@ class AsyncSignal(DynamicSignal): bec_message_type=messages.DeviceMessage, async_update=async_update, signal_metadata={"max_size": max_size}, + acquisition_group=acquisition_group, + signals=signals, **kwargs, ) + + +class AsyncSignal(DynamicSignal): + """Device Signal to emit data asynchronously.""" + + strict_signal_validation = True + + def __init__( + self, + *, + name: str, + ndim: Literal[0, 1, 2], + max_size: int, + value: messages.DeviceMessage | dict | None = None, + acquisition_group: Literal["baseline", "monitored"] | str | None = None, + async_update: dict[Literal["type", "max_shape", "index"], Any] | None = None, + **kwargs, + ): + """ + Create a new AsyncSignal object. + + Args: + name (str): The name of the signal. + ndim (Literal[0, 1, 2]): The number of dimensions of the signals. + max_size (int): The maximum size of the signal buffer. For ndim=2, this should be kept small to avoid large memory usage. + value (AsyncMessage | dict | None): The initial value of the signal. Defaults to None. + acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group. + async_update (dict | None): Additional metadata for asynchronous updates. + There are three relevant keys "type", "max_shape" and "index". + "type" (str) : Can be one of "add", "add_slice" or "replace". This defines how the new data is added to the existing dataset. + "add" : Appends data to the existing dataset. The data is always appended to the first axis. + "add_slice" : Appends data to the existing dataset, but allows specifying a slice. + The slice is defined by the "index" key. + "replace" : Replaces the existing dataset with the new data. + "max_shape" (list[int | None]): Required for type 'add' and 'add_slice'. It defines where the data is added. For a 1D dataset, + it should be [None]. For a 1D dataset with 3000 elements, it should be [None, 3000]. + For a 2D dataset with 3000x3000 elements, it should be [None, 3000, 3000]. + "index" (int): Only required for type 'add_slice'. It defines the index where the data is added. + """ + kwargs.pop("kind", None) # Ignore kind if specified + super().__init__( + name=name, + data_type="raw", + saved=True, + ndim=ndim, + scope="scan", + role="main", + value=value, + bec_message_type=messages.DeviceMessage, + async_update=async_update, + signal_metadata={"max_size": max_size}, + acquisition_group=acquisition_group, + signals=None, + **kwargs, + ) + + def put( + self, + value: Any, + timestamp: float | None = None, + async_update: dict[Literal["type", "max_shape", "index"], Any] | None = None, + acquisition_group: str | None = None, + **kwargs, + ) -> None: + """ + Put method for AsyncSignal. + + Args: + value (Any): The value to put. + timestamp (float | None): The timestamp of the value. If None, the current time is used. + async_update (dict[Literal["type", "max_shape", "index"], Any] | None): Additional metadata for asynchronous updates. Please refer to the class docstring for details. + acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal. + """ + timestamp = timestamp or time.time() + super().put( + value={self.name: {"value": value, "timestamp": timestamp}}, + async_update=async_update, + acquisition_group=acquisition_group, + **kwargs, + ) + + def set( + self, + value: Any, + timestamp: float | None = None, + async_update: dict[Literal["type", "max_shape", "index"], Any] | None = None, + acquisition_group: str | None = None, + **kwargs, + ) -> DeviceStatus: + """ + Set method for AsyncSignal. + + Args: + value (Any): The value to put. + timestamp (float | None): The timestamp of the value. If None, the current time is used. + async_update (dict[Literal["type", "max_shape", "index"], Any] | None): Additional metadata for asynchronous updates. Please refer to the class docstring for details. + acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal. + """ + self.put( + value=value, + timestamp=timestamp, + async_update=async_update, + acquisition_group=acquisition_group, + **kwargs, + ) + status = DeviceStatus(device=self) + status.set_finished() + return status