mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-02-06 15:18:40 +01:00
feat: add bec_signals with BECMessages to utils
This commit is contained in:
@@ -7,11 +7,17 @@ from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, OphydObject, PositionerBase, Staged
|
||||
from ophyd import Device, DeviceStatus, Kind, OphydObject, PositionerBase, Staged
|
||||
|
||||
from ophyd_devices.sim.sim_camera import SimCamera
|
||||
from ophyd_devices.sim.sim_positioner import SimPositioner
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
from ophyd_devices.utils.bec_signals import (
|
||||
DynamicSignal,
|
||||
FileEventSignal,
|
||||
PreviewSignal,
|
||||
ProgressSignal,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -311,3 +317,127 @@ class SimCameraWithStageStatus(SimCamera):
|
||||
thread = threading.Thread(target=_unstage_device, args=(self, status), daemon=True)
|
||||
thread.start()
|
||||
return status
|
||||
|
||||
|
||||
class SimCameraWithPSIComponents(SimCamera):
|
||||
"""Test Device for PSIComponents"""
|
||||
|
||||
preview_2d = Cpt(PreviewSignal, ndim=2, doc="2D preview signal", num_rot90=2, transpose=True)
|
||||
preview_1d = Cpt(PreviewSignal, ndim=1, doc="1D preview signal")
|
||||
file_event = Cpt(FileEventSignal, doc="File event signal")
|
||||
progress = Cpt(ProgressSignal, doc="Progress signal")
|
||||
dynamic_signal = Cpt(
|
||||
DynamicSignal, doc="Dynamic signals", signal_names=["preview_2d", "preview_1d"]
|
||||
)
|
||||
# TODO Handling of AsyncComponents is postponed, issue #104 is created
|
||||
|
||||
# Define signals for the async components
|
||||
# signal_dict = {
|
||||
# "signal1": {"kind": Kind.hinted, "doc": "Signal 1"},
|
||||
# "signal2": {"kind": Kind.normal, "doc": "Signal 2"},
|
||||
# "signal3": {"kind": Kind.config, "doc": "Signal 3"},
|
||||
# "signal4": {"kind": Kind.omitted, "doc": "Signal 4"},
|
||||
# }
|
||||
|
||||
# async_1d = Async1DComponent(doc="Async 1D signal", signal_def=signal_dict)
|
||||
# async_2d = Async2DComponent(doc="Async 2D signal", signal_def=signal_dict)
|
||||
|
||||
def __init__(self, name: str, scan_info=None, device_manager=None, **kwargs):
|
||||
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
|
||||
self._triggers_received = 0
|
||||
|
||||
def on_stage(self):
|
||||
"""Stage device"""
|
||||
self.file_path = self.file_utils.get_full_path(
|
||||
scan_status_msg=self.scan_info.msg, name=self.name
|
||||
)
|
||||
self.frames.set(
|
||||
self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters["frames_per_trigger"]
|
||||
).wait()
|
||||
self.exp_time.set(self.scan_info.msg.scan_parameters["exp_time"]).wait()
|
||||
self.burst.set(self.scan_info.msg.scan_parameters["frames_per_trigger"]).wait()
|
||||
# Always emit a file event
|
||||
msg = messages.FileMessage(
|
||||
file_path=self.file_path, done=False, successful=False, device_name=self.name
|
||||
)
|
||||
self.file_event.put(file_path=self.file_path, done=False, successful=False)
|
||||
self.file_event.set(msg).wait()
|
||||
self._triggers_received = 0
|
||||
|
||||
def on_trigger(self):
|
||||
"""Trigger device"""
|
||||
self._triggers_received += 1
|
||||
|
||||
def trigger_cam():
|
||||
for _ in range(self.burst.get()):
|
||||
data = self.image.get()
|
||||
self.preview_2d.put(data)
|
||||
# sum array in one dimension
|
||||
self.preview_1d.put(np.sum(data, 1))
|
||||
self.dynamic_signal.put(
|
||||
{"preview_2d": {"value": data}, "preview_1d": {"value": np.sum(data, 1)}}
|
||||
)
|
||||
progress = {
|
||||
"value": self._triggers_received,
|
||||
"max_value": self.scan_info.msg.num_points,
|
||||
"done": (self._triggers_received == self.scan_info.msg.num_points),
|
||||
}
|
||||
progress = messages.ProgressMessage(**progress)
|
||||
self.progress.put(progress)
|
||||
self._set_async_signal()
|
||||
|
||||
status = self.task_handler.submit_task(trigger_cam)
|
||||
return status
|
||||
|
||||
def _set_async_signal(self, update_all: bool = False):
|
||||
"""Set the async signal values.
|
||||
|
||||
Args:
|
||||
update_all (bool): If True, update all signals. If False, update only hinted and normal signals.
|
||||
"""
|
||||
# pylint: disable=protected-access
|
||||
for sig in self.async_1d._signals.values():
|
||||
if update_all is True:
|
||||
sig.put(np.random.random())
|
||||
continue
|
||||
if sig.kind in [Kind.hinted, Kind.normal]:
|
||||
sig.put(np.random.random())
|
||||
for sig in self.async_2d._signals.values():
|
||||
if update_all is True:
|
||||
sig.put(np.random.random())
|
||||
continue
|
||||
if sig.kind in [Kind.hinted, Kind.normal]:
|
||||
sig.put(np.random.random())
|
||||
|
||||
def on_unstage(self):
|
||||
"""Unstage device"""
|
||||
self._triggers_received = 0
|
||||
|
||||
def on_complete(self):
|
||||
"""Complete device"""
|
||||
|
||||
def complete_cam():
|
||||
"""Complete the camera acquisition."""
|
||||
msg = messages.FileMessage(
|
||||
file_path=self.file_path if self.file_path else "",
|
||||
done=True,
|
||||
successful=True,
|
||||
device_name=self.name,
|
||||
)
|
||||
self.file_event.set(msg).wait()
|
||||
progress = {
|
||||
"value": self._triggers_received,
|
||||
"max_value": self.scan_info.msg.num_points,
|
||||
"done": True,
|
||||
}
|
||||
progress = messages.ProgressMessage(**progress)
|
||||
self.progress.set(progress).wait()
|
||||
self._set_async_signal(update_all=True)
|
||||
|
||||
status = self.task_handler.submit_task(complete_cam)
|
||||
return status
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cam = SimCameraWithPSIComponents(name="cam")
|
||||
cam.read()
|
||||
|
||||
491
ophyd_devices/utils/bec_signals.py
Normal file
491
ophyd_devices/utils/bec_signals.py
Normal file
@@ -0,0 +1,491 @@
|
||||
"""
|
||||
Module for custom BEC signals, that wrap around ophyd.Signal.
|
||||
These signals emit BECMessage objects, which comply with the BEC message system.
|
||||
"""
|
||||
|
||||
from typing import Any, Callable, Literal, Type
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from ophyd import DeviceStatus, Kind, Signal
|
||||
from pydantic import ValidationError
|
||||
from typeguard import typechecked
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
# pylint: disable=arguments-renamed
|
||||
# pylint: disable=too-many-arguments
|
||||
# pylint: disable=signature-differs
|
||||
|
||||
|
||||
class BECMessageSignal(Signal):
|
||||
"""
|
||||
Custom signal class that accepts BECMessage objects as values.
|
||||
Dictionaries are also accepted if convertible to the correct BECMessage type.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
bec_message_type: Type[messages.BECMessage],
|
||||
value: messages.BECMessage | dict | None = None,
|
||||
kind: Kind | str = Kind.omitted,
|
||||
signal_metadata: dict | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Create a new BECMessageSignal object.
|
||||
|
||||
Args:
|
||||
name (str) : The name of the signal.
|
||||
bec_message_type: type[BECMessage] : The type of BECMessage to accept as values.
|
||||
value (BECMessage | dict | None) : The value of the signal. Defaults to None.
|
||||
kind (Kind | str) : The kind of the signal. Defaults to Kind.omitted.
|
||||
"""
|
||||
if isinstance(value, dict):
|
||||
value = bec_message_type(**value)
|
||||
if value and not isinstance(value, bec_message_type):
|
||||
raise ValueError(
|
||||
f"Value must be a {bec_message_type.__name__} or a dict for signal {name}"
|
||||
)
|
||||
self.signal_metadata = signal_metadata
|
||||
self._bec_message_type = bec_message_type
|
||||
kwargs.pop("dtype", None) # Ignore dtype if specified
|
||||
kwargs.pop("shape", None) # Ignore shape if specified
|
||||
super().__init__(name=name, value=value, shape=(), dtype=None, kind=kind, **kwargs)
|
||||
|
||||
def describe(self):
|
||||
out = super().describe()
|
||||
out[self.name]["signal_metadata"] = self.signal_metadata or {}
|
||||
return out
|
||||
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
"""
|
||||
Get the source name of the signal.
|
||||
|
||||
Returns:
|
||||
str: The source name of the signal.
|
||||
"""
|
||||
return f"BECMessageSignal:{self.name}"
|
||||
|
||||
def put(self, value: messages.BECMessage | dict | None = None, **kwargs) -> None:
|
||||
"""
|
||||
Put method for BECMessageSignal.
|
||||
|
||||
If value is set to None, BEC's callback will not update REDIS.
|
||||
|
||||
Args:
|
||||
value (BECMessage | dict | None) : The value to put.
|
||||
"""
|
||||
if isinstance(value, dict):
|
||||
value = self._bec_message_type(**value)
|
||||
if value and not isinstance(value, self._bec_message_type):
|
||||
raise ValueError(
|
||||
f"Value must be a {self._bec_message_type.__name__}"
|
||||
f" or a dict for signal {self.name}"
|
||||
)
|
||||
return super().put(value, **kwargs)
|
||||
|
||||
def set(self, value: messages.BECMessage | dict | None = None, **kwargs) -> DeviceStatus:
|
||||
"""
|
||||
Set method for BECMessageSignal.
|
||||
|
||||
If value is set to None, BEC's callback will not update REDIS.
|
||||
|
||||
Args:
|
||||
value (BECMessage | dict | None) : The value to put.
|
||||
"""
|
||||
self.put(value, **kwargs)
|
||||
status = DeviceStatus(device=self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
def _infer_value_kind(self, inference_func: Any) -> Any:
|
||||
return self._bec_message_type.__name__
|
||||
|
||||
|
||||
class ProgressSignal(BECMessageSignal):
|
||||
"""Signal to emit progress updates."""
|
||||
|
||||
def __init__(
|
||||
self, *, name: str, value: messages.ProgressMessage | dict | None = None, **kwargs
|
||||
):
|
||||
"""
|
||||
Create a new ProgressSignal object.
|
||||
|
||||
Args:
|
||||
name (str) : The name of the signal.
|
||||
value (ProgressMessage | dict | None) : The initial value of the signal. Defaults to None.
|
||||
"""
|
||||
kwargs.pop("kind", None) # Ignore kind if specified
|
||||
super().__init__(
|
||||
name=name,
|
||||
value=value,
|
||||
bec_message_type=messages.ProgressMessage,
|
||||
kind=Kind.omitted,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def put(
|
||||
self,
|
||||
msg: messages.ProgressMessage | dict | None = None,
|
||||
*,
|
||||
value: float | None = None,
|
||||
max_value: float | None = None,
|
||||
done: bool | None = None,
|
||||
metadata: dict | None = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
Put method for ProgressSignal.
|
||||
|
||||
If msg is provided, it will be directly set as ProgressMessage.
|
||||
Dictionaries are accepted and will be converted.
|
||||
Otherwise, at least value, max_value and done must be provided.
|
||||
|
||||
Args:
|
||||
msg (ProgressMessage | dict | None) : The progress message.
|
||||
value (float) : The current progress value.
|
||||
max_value (float) : The maximum progress value.
|
||||
done (bool) : Whether the progress is done.
|
||||
metadata (dict | None) : Additional metadata
|
||||
"""
|
||||
# msg is ProgressMessage or dict
|
||||
if isinstance(msg, (messages.ProgressMessage, dict)):
|
||||
return super().put(msg, **kwargs)
|
||||
try:
|
||||
msg = messages.ProgressMessage(
|
||||
value=value, max_value=max_value, done=done, metadata=metadata
|
||||
)
|
||||
except ValidationError as exc:
|
||||
raise ValueError(f"Error setting signal {self.name}: {exc}") from exc
|
||||
return super().put(msg, **kwargs)
|
||||
|
||||
def set(
|
||||
self,
|
||||
msg: messages.ProgressMessage | dict | None = None,
|
||||
*,
|
||||
value: float | None = None,
|
||||
max_value: float | None = None,
|
||||
done: bool | None = None,
|
||||
metadata: dict | None = None,
|
||||
**kwargs,
|
||||
) -> DeviceStatus:
|
||||
"""
|
||||
Set method for ProgressSignal.
|
||||
|
||||
If msg is provided, it will be directly set as ProgressMessage.
|
||||
Dictionaries are accepted and will be converted.
|
||||
Otherwise, at least value, max_value and done must be provided.
|
||||
|
||||
Args:
|
||||
msg (ProgressMessage | dict | None) : The progress message.
|
||||
value (float) : The current progress value.
|
||||
max_value (float) : The maximum progress value.
|
||||
done (bool) : Whether the progress is done.
|
||||
metadata (dict | None) : Additional metadata
|
||||
"""
|
||||
self.put(msg=msg, value=value, max_value=max_value, done=done, metadata=metadata, **kwargs)
|
||||
status = DeviceStatus(device=self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
class FileEventSignal(BECMessageSignal):
|
||||
"""Signal to emit file events."""
|
||||
|
||||
def __init__(self, *, name: str, value: messages.FileMessage | dict | None = None, **kwargs):
|
||||
"""
|
||||
Create a new FileEventSignal object.
|
||||
|
||||
Args:
|
||||
name (str) : The name of the signal.
|
||||
value (FileMessage | dict | None) : The initial value of the signal. Defaults to None.
|
||||
kind (Kind | str) : The kind of the signal. Defaults to Kind.omitted.
|
||||
"""
|
||||
kwargs.pop("kind", None) # Ignore kind if specified
|
||||
super().__init__(
|
||||
name=name,
|
||||
value=value,
|
||||
bec_message_type=messages.FileMessage,
|
||||
kind=Kind.omitted,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def put(
|
||||
self,
|
||||
msg: messages.FileMessage | dict | None = None,
|
||||
*,
|
||||
file_path: str | None = None,
|
||||
done: bool | None = None,
|
||||
successful: bool | None = None,
|
||||
device_name: str | None = None,
|
||||
file_type: str = "h5",
|
||||
hinted_h5_entries: dict[str, str] | None = None,
|
||||
metadata: dict | None = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
Put method for FileEventSignal.
|
||||
|
||||
If msg is provided, it will be directly set as FileMessage.
|
||||
Dictionaries are accepted and will be converted.
|
||||
Otherwise, at least file_path, done and successful must be provided.
|
||||
|
||||
Args:
|
||||
msg (FileMessage | dict | None) : The file event message.
|
||||
file_path (str) : The path of the file.
|
||||
done (bool) : Whether the file event is done.
|
||||
successful (bool) : Whether the file event was successful.
|
||||
device_name (str | None) : The name of the device.
|
||||
file_type (str | None) : The type of the file.
|
||||
hinted_h5_entries (dict[str, str] | None): The hinted h5 entries.
|
||||
metadata (dict | None) : Additional metadata.
|
||||
"""
|
||||
if isinstance(msg, (messages.FileMessage, dict)):
|
||||
return super().put(msg, **kwargs)
|
||||
# kwargs provided
|
||||
try:
|
||||
msg = messages.FileMessage(
|
||||
file_path=file_path,
|
||||
done=done,
|
||||
successful=successful,
|
||||
device_name=device_name,
|
||||
file_type=file_type,
|
||||
hinted_h5_entries=hinted_h5_entries,
|
||||
metadata=metadata,
|
||||
)
|
||||
except ValidationError as exc:
|
||||
raise ValueError(f"Error setting signal {self.name}: {exc}") from exc
|
||||
return super().put(msg, **kwargs)
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def set(
|
||||
self,
|
||||
msg: messages.FileMessage | dict | None = None,
|
||||
*,
|
||||
file_path: str | None = None,
|
||||
done: bool | None = None,
|
||||
successful: bool | None = None,
|
||||
device_name: str | None = None,
|
||||
file_type: str = "h5",
|
||||
hinted_h5_entries: dict[str, str] | None = None,
|
||||
metadata: dict | None = None,
|
||||
**kwargs,
|
||||
) -> DeviceStatus:
|
||||
"""
|
||||
Set method for FileEventSignal.
|
||||
|
||||
If msg is provided, it will be directly set as FileMessage.
|
||||
Dictionaries are accepted and will be converted.
|
||||
Otherwise, at least file_path, done and successful must be provided.
|
||||
|
||||
Args:
|
||||
msg (FileMessage | dict | None) : The file event message.
|
||||
file_path (str) : The path of the file.
|
||||
done (bool) : Whether the file event is done.
|
||||
successful (bool) : Whether the file event was successful.
|
||||
device_name (str | None) : The name of the device.
|
||||
file_type (str | None) : The type of the file.
|
||||
hinted_h5_entries (dict[str, str] | None): The hinted h5 entries.
|
||||
metadata (dict | None) : Additional metadata.
|
||||
"""
|
||||
self.put(
|
||||
msg=msg,
|
||||
file_path=file_path,
|
||||
done=done,
|
||||
successful=successful,
|
||||
device_name=device_name,
|
||||
file_type=file_type,
|
||||
hinted_h5_entries=hinted_h5_entries,
|
||||
metadata=metadata,
|
||||
**kwargs,
|
||||
)
|
||||
status = DeviceStatus(device=self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
class PreviewSignal(BECMessageSignal):
|
||||
"""Signal to emit preview data."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
ndim: Literal[1, 2],
|
||||
num_rot90: Literal[0, 1, 2, 3] = 0,
|
||||
transpose: bool = False,
|
||||
value: dict | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Create a new FileEventSignal object.
|
||||
|
||||
Args:
|
||||
name (str) : The name of the signal.
|
||||
ndim (Literal[1, 2]) : The number of dimensions.
|
||||
value (DeviceMonitorMessage | dict | None) : The initial value of the signal. Defaults to None.
|
||||
"""
|
||||
kwargs.pop("kind", None)
|
||||
super().__init__(
|
||||
name=name,
|
||||
value=value,
|
||||
bec_message_type=messages.DevicePreviewMessage,
|
||||
kind=Kind.omitted,
|
||||
signal_metadata={"ndim": ndim, "num_rot90": num_rot90, "transpose": transpose},
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# pylint: disable=signature-differs
|
||||
def put(
|
||||
self,
|
||||
value: list | np.ndarray | dict | messages.DevicePreviewMessage,
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
Put method for Preview1DSignal.
|
||||
|
||||
If value is a DeviceMonitor1DMessage, it will be directly set,
|
||||
if value is a dict, it will be converted to a DeviceMonitor1DMessage.
|
||||
|
||||
Args:
|
||||
value (list | np.ndarray | dict | self._bec_message_type): The preview data. Must be 1D.
|
||||
metadata (dict | None): Additional metadata. If dict or self._bec_message_type is passed, it will be ignored.
|
||||
"""
|
||||
if isinstance(value, self._bec_message_type):
|
||||
return super().put(value, **kwargs)
|
||||
if isinstance(value, dict):
|
||||
value["device"] = self.parent.name
|
||||
value["signal"] = self.name
|
||||
return super().put(value, **kwargs)
|
||||
device = self.parent.name
|
||||
if isinstance(value, list):
|
||||
value = np.array(value)
|
||||
try:
|
||||
msg = self._bec_message_type(
|
||||
data=value, device=device, signal=self.name, metadata=metadata
|
||||
)
|
||||
except ValidationError as exc:
|
||||
raise ValueError(f"Error setting signal {self.name}: {exc}") from exc
|
||||
super().put(msg, **kwargs)
|
||||
|
||||
# pylint: disable=signature-differs
|
||||
def set(
|
||||
self, value: list | np.ndarray | dict, *, metadata: dict | None = None, **kwargs
|
||||
) -> DeviceStatus:
|
||||
"""
|
||||
Put method for Preview1DSignal.
|
||||
|
||||
If value is a DeviceMonitor1DMessage, it will be directly set,
|
||||
if value is a dict, it will be converted to a DeviceMonitor1DMessage.
|
||||
|
||||
Args:
|
||||
value (list | np.ndarray | dict | self._bec_message_type) : The preview data. Must be 1D.
|
||||
metadata (dict | None) : Additional metadata. If dict or self._bec_message_type is passed, it will be ignored.
|
||||
"""
|
||||
self.put(value=value, metadata=metadata, **kwargs)
|
||||
status = DeviceStatus(device=self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
class DynamicSignal(BECMessageSignal):
|
||||
"""Signal to emit dynamic device data."""
|
||||
|
||||
@typechecked
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
signal_names: list[str] | Callable[[], list[str]],
|
||||
value: messages.DeviceMessage | dict | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Create a new DynamicSignal object.
|
||||
|
||||
Args:
|
||||
name (str) : The name of the signal.
|
||||
signal_names (list[str] | Callable) : The names of all signals. Can be a list or a callable.
|
||||
value (DeviceMessage | dict | None) : The initial value of the signal. Defaults to None.
|
||||
"""
|
||||
_raise = False
|
||||
if callable(signal_names):
|
||||
self.signal_names = signal_names()
|
||||
elif isinstance(signal_names, list):
|
||||
self.signal_names = signal_names
|
||||
else:
|
||||
_raise = True
|
||||
|
||||
if _raise is True or len(self.signal_names) == 0:
|
||||
raise ValueError(f"No names provided for Dynamic signal {name} via {signal_names}")
|
||||
kwargs.pop("kind", None) # Ignore kind if specified
|
||||
super().__init__(
|
||||
name=name,
|
||||
value=value,
|
||||
bec_message_type=messages.DeviceMessage,
|
||||
kind=Kind.omitted,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def put(
|
||||
self,
|
||||
value: messages.DeviceMessage | dict[str, dict[Literal["value", "timestamp"], Any]],
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
Put method for DynamicSignal.
|
||||
|
||||
All signal names must be defined upon signal creation via signal_names_list.
|
||||
If value is a DeviceMessage, it will be directly set,
|
||||
if value is a dict, it will be converted to a DeviceMessage.
|
||||
|
||||
Args:
|
||||
value (dict | DeviceMessage) : The dynamic device data.
|
||||
metadata (dict | None) : Additional metadata.
|
||||
"""
|
||||
if isinstance(value, messages.DeviceMessage):
|
||||
self._check_signals(value)
|
||||
return super().put(value, **kwargs)
|
||||
try:
|
||||
msg = messages.DeviceMessage(signals=value, metadata=metadata)
|
||||
except ValidationError as exc:
|
||||
raise ValueError(f"Error setting signal {self.name}: {exc}") from exc
|
||||
self._check_signals(msg)
|
||||
return super().put(msg, **kwargs)
|
||||
|
||||
def _check_signals(self, msg: messages.DeviceMessage) -> None:
|
||||
"""Check if all signals are valid."""
|
||||
if any(name not in self.signal_names for name in msg.signals):
|
||||
raise ValueError(
|
||||
f"Invalid signal name in message {list(msg.signals.keys())} for signals {self.signal_names}"
|
||||
)
|
||||
|
||||
def set(
|
||||
self,
|
||||
value: messages.DeviceMessage | dict[str, dict[Literal["value"], Any]],
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
**kwargs,
|
||||
) -> DeviceStatus:
|
||||
"""
|
||||
Set method for DynamicSignal.
|
||||
|
||||
All signal names must be defined upon signal creation via signal_names_list.
|
||||
If value is a DeviceMessage, it will be directly set,
|
||||
if value is a dict, it will be converted to a DeviceMessage.
|
||||
|
||||
Args:
|
||||
value (dict | DeviceMessage) : The dynamic device data.
|
||||
metadata (dict | None) : Additional metadata.
|
||||
"""
|
||||
self.put(value, metadata=metadata, **kwargs)
|
||||
status = DeviceStatus(device=self)
|
||||
status.set_finished()
|
||||
return status
|
||||
Reference in New Issue
Block a user