From 9b51b22671a6ff2b78576f75834c061d6835d8af Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 15 Oct 2025 12:42:54 +0200 Subject: [PATCH] feat(bec-signals): Add acquisition group to BECMessageSignal and SignalInfo --- ophyd_devices/utils/bec_signals.py | 77 ++++++++++++++++++++++++------ 1 file changed, 62 insertions(+), 15 deletions(-) diff --git a/ophyd_devices/utils/bec_signals.py b/ophyd_devices/utils/bec_signals.py index b310860..c41c15e 100644 --- a/ophyd_devices/utils/bec_signals.py +++ b/ophyd_devices/utils/bec_signals.py @@ -3,6 +3,7 @@ Module for custom BEC signals, that wrap around ophyd.Signal. These signals emit BECMessage objects, which comply with the BEC message system. """ +from time import time from typing import Any, Callable, Literal, Type import numpy as np @@ -10,6 +11,7 @@ from bec_lib import messages from bec_lib.logger import bec_logger from ophyd import DeviceStatus, Kind, Signal from pydantic import BaseModel, Field, ValidationError +from typeguard import typechecked logger = bec_logger.logger # pylint: disable=arguments-differ @@ -60,6 +62,14 @@ class SignalInfo(BaseModel): default=None, description="Metadata for the signal, which can include additional information about the signal's properties.", ) + acquisition_group: Literal["baseline", "monitored"] | str | None = Field( + default=None, + description="""Specifies the acquisition group of the signal. + It can be in sync with 'baseline' or 'monitored' groups mapping readoutPriority. + Or mapped to a custom tag that allows grouping signals for acquisition and plotting. + If None, the signal does not belong to any specific acquisition group. + """, + ) class BECMessageSignal(Signal): @@ -79,6 +89,7 @@ class BECMessageSignal(Signal): ndim: Literal[0, 1, 2] | None = None, scope: Literal["scan", "continuous"] = "scan", role: Literal["main", "preview", "diagnostic", "file event", "progress"] = "main", + acquisition_group: Literal["baseline", "monitored"] | str | None = None, enabled: bool = True, signals: ( Callable[[], list[str]] @@ -116,6 +127,7 @@ class BECMessageSignal(Signal): self.scope = scope self.role = role self.enabled = enabled + self.acquisition_group = acquisition_group self.signals = self._unify_signals(signals) self.signal_metadata = signal_metadata self._bec_message_type = bec_message_type @@ -169,6 +181,7 @@ class BECMessageSignal(Signal): enabled=self.enabled, signals=self.signals, signal_metadata=self.signal_metadata, + acquisition_group=self.acquisition_group, ).model_dump() return out @@ -618,7 +631,7 @@ class PreviewSignal(BECMessageSignal): class DynamicSignal(BECMessageSignal): - """Signal to emit dynamic device data.""" + """Signal group to emit dynamic device signal data.""" strict_signal_validation = False # Disable strict signal validation @@ -628,17 +641,29 @@ class DynamicSignal(BECMessageSignal): name: str, signals: list[str] | Callable[[], list[str]] | None = None, value: messages.DeviceMessage | dict | None = None, - async_update: dict | None = None, + async_update: dict[Literal["type", "max_size", "index"], Any] | None = None, + acquisition_group: Literal["baseline", "monitored"] | str | None = None, **kwargs, ): """ Create a new DynamicSignal object. Args: - name (str): The name of the signal. - signal_names (list[str] | Callable): The names of all signals. Can be a list or a callable. + name (str): The name of the signal group. + signal_names (list[str] | Callable): Names of all signals. Can be a list or a callable. value (DeviceMessage | dict | None): The initial value of the signal. Defaults to None. - async_update (dict | None): Additional metadata for asynchronous updates. Defaults to None. + acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group. + async_update (dict | None): Additional metadata for asynchronous updates. + There are three relevant keys "type", "max_size" and "index". + "type" (str) : Can be one of "add", "add_slice" or "replace". This defines how the new data is added to the existing dataset. + "add" : Appends data to the existing dataset. The data is always appended to the first axis. + "add_slice" : Appends data to the existing dataset, but allows specifying a slice. + The slice is defined by the "index" key. + "replace" : Replaces the existing dataset with the new data. + "max_size" (list[int | None]): Required for type 'add' and 'add_slice'. It defines where the data is added. For a 1D dataset, + it should be [None]. For a 1D dataset with 3000 elements, it should be [None, 3000]. + For a 2D dataset with 3000x3000 elements, it should be [None, 3000, 3000]. + "index" (int): Only required for type 'add_slice'. It defines the index where the data is added. """ self.async_update = async_update @@ -653,15 +678,18 @@ class DynamicSignal(BECMessageSignal): signals=signals, value=value, bec_message_type=kwargs.pop("bec_message_type", messages.DeviceMessage), + acquisition_group=acquisition_group, **kwargs, ) + @typechecked def put( self, value: messages.DeviceMessage | dict[str, dict[Literal["value", "timestamp"], Any]], *, metadata: dict | None = None, - async_update: dict | None = None, + async_update: dict[Literal["type", "max_size", "index"], Any] | None = None, + acquisition_group: Literal["baseline", "monitored"] | str | None = None, **kwargs, ) -> None: """ @@ -674,21 +702,33 @@ class DynamicSignal(BECMessageSignal): Args: value (dict | DeviceMessage): The dynamic device data. metadata (dict | None): Additional metadata. + async_update (dict[Literal["type", "max_size", "index"], Any] | None): Additional metadata for asynchronous updates. + acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group. """ if isinstance(value, messages.DeviceMessage): - if metadata is not None or async_update is not None: + if metadata is not None or async_update is not None or acquisition_group is not None: logger.warning( - "Ignoring metadata and async_update arguments when value is a DeviceMessage." + "Ignoring metadata, async_update and acquisition_group arguments when value is a DeviceMessage." ) self._check_signals(value) return super().put(value, **kwargs) try: metadata = metadata or {} - if "async_update" not in metadata: - if async_update is not None: - metadata["async_update"] = async_update - elif self.async_update is not None: - metadata["async_update"] = self.async_update + if async_update is not None: + metadata["async_update"] = async_update + else: + metadata["async_update"] = self.async_update + if not metadata.get("async_update"): + raise ValueError( + f"Async update must be provided for signal {self.name} of class {self.__class__.__name__}." + ) + else: + pass + # TODO #627 Issue in BEC: Validate async_update --> bec_lib + if acquisition_group is not None: + metadata["acquisition_group"] = acquisition_group + elif self.acquisition_group is not None: + metadata["acquisition_group"] = self.acquisition_group msg = messages.DeviceMessage(signals=value, metadata=metadata) except ValidationError as exc: @@ -723,7 +763,8 @@ class DynamicSignal(BECMessageSignal): value: messages.DeviceMessage | dict[str, dict[Literal["value"], Any]], *, metadata: dict | None = None, - async_update: dict | None = None, + async_update: dict[Literal["type", "max_size", "index"], Any] | None = None, + acquisition_group: Literal["baseline", "monitored"] | str | None = None, **kwargs, ) -> DeviceStatus: """ @@ -737,7 +778,13 @@ class DynamicSignal(BECMessageSignal): value (dict | DeviceMessage) : The dynamic device data. metadata (dict | None) : Additional metadata. """ - self.put(value, metadata=metadata, async_update=async_update, **kwargs) + self.put( + value, + metadata=metadata, + async_update=async_update, + acquisition_group=acquisition_group, + **kwargs, + ) status = DeviceStatus(device=self) status.set_finished() return status