feat(bec-signals): Add acquisition group to BECMessageSignal and SignalInfo

This commit is contained in:
2025-10-15 12:42:54 +02:00
committed by Christian Appel
parent 297c5eee38
commit 9b51b22671

View File

@@ -3,6 +3,7 @@ Module for custom BEC signals, that wrap around ophyd.Signal.
These signals emit BECMessage objects, which comply with the BEC message system.
"""
from time import time
from typing import Any, Callable, Literal, Type
import numpy as np
@@ -10,6 +11,7 @@ from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, Kind, Signal
from pydantic import BaseModel, Field, ValidationError
from typeguard import typechecked
logger = bec_logger.logger
# pylint: disable=arguments-differ
@@ -60,6 +62,14 @@ class SignalInfo(BaseModel):
default=None,
description="Metadata for the signal, which can include additional information about the signal's properties.",
)
acquisition_group: Literal["baseline", "monitored"] | str | None = Field(
default=None,
description="""Specifies the acquisition group of the signal.
It can be in sync with 'baseline' or 'monitored' groups mapping readoutPriority.
Or mapped to a custom tag that allows grouping signals for acquisition and plotting.
If None, the signal does not belong to any specific acquisition group.
""",
)
class BECMessageSignal(Signal):
@@ -79,6 +89,7 @@ class BECMessageSignal(Signal):
ndim: Literal[0, 1, 2] | None = None,
scope: Literal["scan", "continuous"] = "scan",
role: Literal["main", "preview", "diagnostic", "file event", "progress"] = "main",
acquisition_group: Literal["baseline", "monitored"] | str | None = None,
enabled: bool = True,
signals: (
Callable[[], list[str]]
@@ -116,6 +127,7 @@ class BECMessageSignal(Signal):
self.scope = scope
self.role = role
self.enabled = enabled
self.acquisition_group = acquisition_group
self.signals = self._unify_signals(signals)
self.signal_metadata = signal_metadata
self._bec_message_type = bec_message_type
@@ -169,6 +181,7 @@ class BECMessageSignal(Signal):
enabled=self.enabled,
signals=self.signals,
signal_metadata=self.signal_metadata,
acquisition_group=self.acquisition_group,
).model_dump()
return out
@@ -618,7 +631,7 @@ class PreviewSignal(BECMessageSignal):
class DynamicSignal(BECMessageSignal):
"""Signal to emit dynamic device data."""
"""Signal group to emit dynamic device signal data."""
strict_signal_validation = False # Disable strict signal validation
@@ -628,17 +641,29 @@ class DynamicSignal(BECMessageSignal):
name: str,
signals: list[str] | Callable[[], list[str]] | None = None,
value: messages.DeviceMessage | dict | None = None,
async_update: dict | None = None,
async_update: dict[Literal["type", "max_size", "index"], Any] | None = None,
acquisition_group: Literal["baseline", "monitored"] | str | None = None,
**kwargs,
):
"""
Create a new DynamicSignal object.
Args:
name (str): The name of the signal.
signal_names (list[str] | Callable): The names of all signals. Can be a list or a callable.
name (str): The name of the signal group.
signal_names (list[str] | Callable): Names of all signals. Can be a list or a callable.
value (DeviceMessage | dict | None): The initial value of the signal. Defaults to None.
async_update (dict | None): Additional metadata for asynchronous updates. Defaults to None.
acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group.
async_update (dict | None): Additional metadata for asynchronous updates.
There are three relevant keys "type", "max_size" and "index".
"type" (str) : Can be one of "add", "add_slice" or "replace". This defines how the new data is added to the existing dataset.
"add" : Appends data to the existing dataset. The data is always appended to the first axis.
"add_slice" : Appends data to the existing dataset, but allows specifying a slice.
The slice is defined by the "index" key.
"replace" : Replaces the existing dataset with the new data.
"max_size" (list[int | None]): Required for type 'add' and 'add_slice'. It defines where the data is added. For a 1D dataset,
it should be [None]. For a 1D dataset with 3000 elements, it should be [None, 3000].
For a 2D dataset with 3000x3000 elements, it should be [None, 3000, 3000].
"index" (int): Only required for type 'add_slice'. It defines the index where the data is added.
"""
self.async_update = async_update
@@ -653,15 +678,18 @@ class DynamicSignal(BECMessageSignal):
signals=signals,
value=value,
bec_message_type=kwargs.pop("bec_message_type", messages.DeviceMessage),
acquisition_group=acquisition_group,
**kwargs,
)
@typechecked
def put(
self,
value: messages.DeviceMessage | dict[str, dict[Literal["value", "timestamp"], Any]],
*,
metadata: dict | None = None,
async_update: dict | None = None,
async_update: dict[Literal["type", "max_size", "index"], Any] | None = None,
acquisition_group: Literal["baseline", "monitored"] | str | None = None,
**kwargs,
) -> None:
"""
@@ -674,21 +702,33 @@ class DynamicSignal(BECMessageSignal):
Args:
value (dict | DeviceMessage): The dynamic device data.
metadata (dict | None): Additional metadata.
async_update (dict[Literal["type", "max_size", "index"], Any] | None): Additional metadata for asynchronous updates.
acquisition_group (Literal["baseline", "monitored"] | str | None): The acquisition group of the signal group.
"""
if isinstance(value, messages.DeviceMessage):
if metadata is not None or async_update is not None:
if metadata is not None or async_update is not None or acquisition_group is not None:
logger.warning(
"Ignoring metadata and async_update arguments when value is a DeviceMessage."
"Ignoring metadata, async_update and acquisition_group arguments when value is a DeviceMessage."
)
self._check_signals(value)
return super().put(value, **kwargs)
try:
metadata = metadata or {}
if "async_update" not in metadata:
if async_update is not None:
metadata["async_update"] = async_update
elif self.async_update is not None:
metadata["async_update"] = self.async_update
if async_update is not None:
metadata["async_update"] = async_update
else:
metadata["async_update"] = self.async_update
if not metadata.get("async_update"):
raise ValueError(
f"Async update must be provided for signal {self.name} of class {self.__class__.__name__}."
)
else:
pass
# TODO #627 Issue in BEC: Validate async_update --> bec_lib
if acquisition_group is not None:
metadata["acquisition_group"] = acquisition_group
elif self.acquisition_group is not None:
metadata["acquisition_group"] = self.acquisition_group
msg = messages.DeviceMessage(signals=value, metadata=metadata)
except ValidationError as exc:
@@ -723,7 +763,8 @@ class DynamicSignal(BECMessageSignal):
value: messages.DeviceMessage | dict[str, dict[Literal["value"], Any]],
*,
metadata: dict | None = None,
async_update: dict | None = None,
async_update: dict[Literal["type", "max_size", "index"], Any] | None = None,
acquisition_group: Literal["baseline", "monitored"] | str | None = None,
**kwargs,
) -> DeviceStatus:
"""
@@ -737,7 +778,13 @@ class DynamicSignal(BECMessageSignal):
value (dict | DeviceMessage) : The dynamic device data.
metadata (dict | None) : Additional metadata.
"""
self.put(value, metadata=metadata, async_update=async_update, **kwargs)
self.put(
value,
metadata=metadata,
async_update=async_update,
acquisition_group=acquisition_group,
**kwargs,
)
status = DeviceStatus(device=self)
status.set_finished()
return status