mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-07-09 18:28:03 +02:00
feat: add proxy for h5 image replay for SimCamera
This commit is contained in:
@ -16,7 +16,7 @@ from .sim.sim import SimPositioner
|
|||||||
from .sim.sim import SimPositioner as SynAxisOPAAS
|
from .sim.sim import SimPositioner as SynAxisOPAAS
|
||||||
from .sim.sim import SynDeviceOPAAS
|
from .sim.sim import SynDeviceOPAAS
|
||||||
from .sim.sim_signals import ReadOnlySignal
|
from .sim.sim_signals import ReadOnlySignal
|
||||||
from .sim.sim_frameworks import DeviceProxy, SlitProxy
|
from .sim.sim_frameworks import DeviceProxy, SlitProxy, H5ImageReplayProxy
|
||||||
from .sim.sim_signals import ReadOnlySignal as SynSignalRO
|
from .sim.sim_signals import ReadOnlySignal as SynSignalRO
|
||||||
from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages
|
from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages
|
||||||
from .smaract.smaract_ophyd import SmaractMotor
|
from .smaract.smaract_ophyd import SmaractMotor
|
||||||
|
@ -1,13 +1,82 @@
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
from scipy.ndimage import gaussian_filter
|
from scipy.ndimage import gaussian_filter
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
import h5py
|
||||||
|
import hdf5plugin
|
||||||
|
|
||||||
|
from ophyd import Staged, Kind
|
||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from ophyd_devices.sim.sim_data import NoiseType
|
from ophyd_devices.sim.sim_data import NoiseType
|
||||||
|
from ophyd_devices.sim.sim_signals import CustomSetableSignal
|
||||||
from ophyd_devices.utils.bec_device_base import BECDeviceBase
|
from ophyd_devices.utils.bec_device_base import BECDeviceBase
|
||||||
|
|
||||||
|
|
||||||
class DeviceProxy(BECDeviceBase):
|
class DeviceProxy(BECDeviceBase, ABC):
|
||||||
"""DeviceProxy class inherits from BECDeviceBase."""
|
"""DeviceProxy class inherits from BECDeviceBase.
|
||||||
|
|
||||||
|
It is an abstract class that is meant to be used as a base class for all device proxies.
|
||||||
|
The minimum requirement for a device proxy is to implement the _compute method.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name,
|
||||||
|
*args,
|
||||||
|
device_manager=None,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
self.name = name
|
||||||
|
self.device_manager = device_manager
|
||||||
|
self.config = None
|
||||||
|
self._lookup = defaultdict(dict)
|
||||||
|
super().__init__(name, *args, device_manager=device_manager, **kwargs)
|
||||||
|
self._signals = dict()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def lookup(self):
|
||||||
|
"""lookup property"""
|
||||||
|
return self._lookup
|
||||||
|
|
||||||
|
@lookup.setter
|
||||||
|
def lookup(self, update: dict) -> None:
|
||||||
|
"""lookup setter"""
|
||||||
|
self._lookup.update(update)
|
||||||
|
|
||||||
|
def _update_device_config(self, config: dict) -> None:
|
||||||
|
"""
|
||||||
|
BEC will call this method on every object upon initializing devices to pass over the deviceConfig
|
||||||
|
from the config file. It can be conveniently be used to hand over initial parameters to the device.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config (dict): Config dictionary.
|
||||||
|
"""
|
||||||
|
self.config = config
|
||||||
|
self._compile_lookup()
|
||||||
|
|
||||||
|
def _compile_lookup(self):
|
||||||
|
"""Compile the lookup table for the device."""
|
||||||
|
for device_name in self.config.keys():
|
||||||
|
self._lookup[device_name] = {
|
||||||
|
"method": self._compute,
|
||||||
|
"signal_name": self.config[device_name]["signal_name"],
|
||||||
|
"args": (device_name,),
|
||||||
|
"kwargs": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _compute(self, device_name: str, *args, **kwargs) -> any:
|
||||||
|
"""
|
||||||
|
The purpose of this method is to compute the readback value for the signal of the device
|
||||||
|
that this proxy is attached to. This method is meant to be overriden by the user.
|
||||||
|
P
|
||||||
|
|
||||||
|
Args:
|
||||||
|
device_name (str): Name of the device.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class SlitProxy(DeviceProxy):
|
class SlitProxy(DeviceProxy):
|
||||||
@ -26,7 +95,7 @@ class SlitProxy(DeviceProxy):
|
|||||||
`dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})`
|
`dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})`
|
||||||
|
|
||||||
slit_sim:
|
slit_sim:
|
||||||
readoutPriority: on_request
|
readoutPriority: baseline
|
||||||
deviceClass: SlitProxy
|
deviceClass: SlitProxy
|
||||||
deviceConfig:
|
deviceConfig:
|
||||||
eiger:
|
eiger:
|
||||||
@ -50,48 +119,13 @@ class SlitProxy(DeviceProxy):
|
|||||||
device_manager=None,
|
device_manager=None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
self.name = name
|
|
||||||
self.device_manager = device_manager
|
|
||||||
self.config = None
|
|
||||||
self._lookup = defaultdict(dict)
|
|
||||||
self._gaussian_blur_sigma = 5
|
self._gaussian_blur_sigma = 5
|
||||||
super().__init__(name, *args, **kwargs)
|
super().__init__(name, *args, device_manager=device_manager, **kwargs)
|
||||||
|
|
||||||
def help(self) -> None:
|
def help(self) -> None:
|
||||||
"""Print documentation for the SlitLookup device."""
|
"""Print documentation for the SlitLookup device."""
|
||||||
print(self.__doc__)
|
print(self.__doc__)
|
||||||
|
|
||||||
def _update_device_config(self, config: dict) -> None:
|
|
||||||
"""
|
|
||||||
BEC will call this method on every object upon initializing devices to pass over the deviceConfig
|
|
||||||
from the config file. It can be conveniently be used to hand over initial parameters to the device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
config (dict): Config dictionary.
|
|
||||||
"""
|
|
||||||
self.config = config
|
|
||||||
self._compile_lookup()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def lookup(self):
|
|
||||||
"""lookup property"""
|
|
||||||
return self._lookup
|
|
||||||
|
|
||||||
@lookup.setter
|
|
||||||
def lookup(self, update: dict) -> None:
|
|
||||||
"""lookup setter"""
|
|
||||||
self._lookup.update(update)
|
|
||||||
|
|
||||||
def _compile_lookup(self):
|
|
||||||
"""Compile the lookup table for the simulated camera."""
|
|
||||||
for device_name in self.config.keys():
|
|
||||||
self._lookup[device_name] = {
|
|
||||||
"method": self._compute,
|
|
||||||
"signal_name": self.config[device_name]["signal_name"],
|
|
||||||
"args": (device_name,),
|
|
||||||
"kwargs": {},
|
|
||||||
}
|
|
||||||
|
|
||||||
def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray:
|
def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray:
|
||||||
"""
|
"""
|
||||||
Compute the lookup table for the simulated camera.
|
Compute the lookup table for the simulated camera.
|
||||||
@ -173,8 +207,166 @@ class SlitProxy(DeviceProxy):
|
|||||||
return np.prod(mask, axis=2)
|
return np.prod(mask, axis=2)
|
||||||
|
|
||||||
|
|
||||||
|
class H5ImageReplayProxy(DeviceProxy):
|
||||||
|
"""This Proxy clas can be used to reply images from an h5 file.
|
||||||
|
|
||||||
|
If the requested images is larger than the available iamges, the images will be replayed from the beginning.
|
||||||
|
|
||||||
|
h5_image_sim:
|
||||||
|
readoutPriority: baseline
|
||||||
|
deviceClass: H5ImageReplayProxy
|
||||||
|
deviceConfig:
|
||||||
|
eiger:
|
||||||
|
signal_name: image
|
||||||
|
file_source: /path/to/h5file.h5
|
||||||
|
h5_entry: /entry/data
|
||||||
|
enabled: true
|
||||||
|
readOnly: false
|
||||||
|
"""
|
||||||
|
|
||||||
|
USER_ACCESS = ["file_source", "h5_entry"]
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name,
|
||||||
|
*args,
|
||||||
|
device_manager=None,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
self.h5_file = None
|
||||||
|
self.h5_dataset = None
|
||||||
|
self._number_of_images = None
|
||||||
|
self.mode = "r"
|
||||||
|
self._staged = Staged.no
|
||||||
|
self._image = None
|
||||||
|
self._index = 0
|
||||||
|
super().__init__(name, *args, device_manager=device_manager, **kwargs)
|
||||||
|
self.file_source = CustomSetableSignal(
|
||||||
|
name="file_source", value="", parent=self, kind=Kind.normal
|
||||||
|
)
|
||||||
|
self.h5_entry = CustomSetableSignal(
|
||||||
|
name="h5_entry", value="", parent=self, kind=Kind.normal
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def component_names(self) -> list[str]:
|
||||||
|
"""Return the names of the components."""
|
||||||
|
return ["file_source", "h5_entry"]
|
||||||
|
|
||||||
|
def _update_device_config(self, config: dict) -> None:
|
||||||
|
super()._update_device_config(config)
|
||||||
|
if len(config.keys()) > 1:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"The current implementation of device {self.name} can only data for a single device. The config hosts multiple keys {config.keys()}"
|
||||||
|
)
|
||||||
|
self._init_signals()
|
||||||
|
|
||||||
|
def _init_signals(self):
|
||||||
|
"""Initialize the signals for the device."""
|
||||||
|
if "file_source" in self.config[list(self.config.keys())[0]]:
|
||||||
|
self.file_source.set(self.config[list(self.config.keys())[0]]["file_source"])
|
||||||
|
if "h5_entry" in self.config[list(self.config.keys())[0]]:
|
||||||
|
self.h5_entry.set(self.config[list(self.config.keys())[0]]["h5_entry"])
|
||||||
|
|
||||||
|
def _open_h5_file(self) -> None:
|
||||||
|
"""Open an HDF5 fiel and return a reference to the dataset without loading its content.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fname (str): File name.
|
||||||
|
enty (str): Entry name.
|
||||||
|
mode (str): Mode of the file, default "r".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
h5py.Dataset: Reference to the dataset.
|
||||||
|
"""
|
||||||
|
self.h5_file = h5py.File(self.file_source.get(), mode=self.mode)
|
||||||
|
self.h5_dataset = self.h5_file[self.h5_entry.get()]
|
||||||
|
self._number_of_images = self.h5_dataset.shape[0]
|
||||||
|
|
||||||
|
def _close_h5_file(self) -> None:
|
||||||
|
"""Close the HDF5 file."""
|
||||||
|
self.h5_file.close()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""Stop the device."""
|
||||||
|
if self.h5_file:
|
||||||
|
self._close_h5_file()
|
||||||
|
self.h5_file = None
|
||||||
|
self.h5_dataset = None
|
||||||
|
self._number_of_images = None
|
||||||
|
self._index = 0
|
||||||
|
|
||||||
|
def stage(self) -> list[object]:
|
||||||
|
"""Stage the device.
|
||||||
|
This opens the HDF5 file, unstaging will close it.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if self._staged != Staged.no:
|
||||||
|
return [self]
|
||||||
|
try:
|
||||||
|
self._open_h5_file()
|
||||||
|
except Exception as exc:
|
||||||
|
if self.h5_file:
|
||||||
|
self.stop()
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"Could not open h5file {self.file_source.get()} or access data set {self.h5_dataset.get()} in file"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
self._staged = Staged.yes
|
||||||
|
return [self]
|
||||||
|
|
||||||
|
def unstage(self) -> list[object]:
|
||||||
|
"""Unstage the device."""
|
||||||
|
if self.h5_file:
|
||||||
|
self.stop()
|
||||||
|
self._staged = Staged.no
|
||||||
|
return [self]
|
||||||
|
|
||||||
|
def _load_image(self):
|
||||||
|
"""Get the image from the h5 file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
index (int): Index of the image.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.ndarray: Image.
|
||||||
|
"""
|
||||||
|
if self.h5_file:
|
||||||
|
slice_nr = self._index % self._number_of_images
|
||||||
|
self._index = self._index + 1
|
||||||
|
self._image = self.h5_dataset[slice_nr, ...]
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.stage()
|
||||||
|
slice_nr = self._index % self._number_of_images
|
||||||
|
self._index = self._index + 1
|
||||||
|
self._image = self.h5_dataset[slice_nr, ...]
|
||||||
|
self.unstage()
|
||||||
|
except Exception as exc:
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"Could not open h5file {self.file_source.get()} or access data set {self.h5_dataset.get()} in file"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray:
|
||||||
|
"""Compute the image.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.ndarray: Image.
|
||||||
|
"""
|
||||||
|
self._load_image()
|
||||||
|
return self._image
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Example usage
|
# Example usage
|
||||||
pinhole = SlitProxy(name="pinhole", device_manager=None)
|
tmp = H5ImageReplayProxy(name="tmp", device_manager=None)
|
||||||
pinhole.describe()
|
config = {
|
||||||
print(pinhole)
|
"eiger": {
|
||||||
|
"signal_name": "image",
|
||||||
|
"file_source": "/Users/appel_c/switchdrive/Sharefolder/AgBH_2D_gridscan/projection_000006_data_000001.h5",
|
||||||
|
"h5_entry": "/entry/data/data",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tmp._update_device_config(config)
|
||||||
|
tmp.stage()
|
||||||
|
print(tmp)
|
||||||
|
@ -5,6 +5,8 @@ from bec_lib import bec_logger
|
|||||||
from ophyd import Signal, Kind
|
from ophyd import Signal, Kind
|
||||||
from ophyd.utils import ReadOnlyError
|
from ophyd.utils import ReadOnlyError
|
||||||
|
|
||||||
|
from ophyd_devices.utils.bec_device_base import BECDeviceBase
|
||||||
|
|
||||||
logger = bec_logger.logger
|
logger = bec_logger.logger
|
||||||
|
|
||||||
# Readout precision for Setable/ReadOnlySignal signals
|
# Readout precision for Setable/ReadOnlySignal signals
|
||||||
@ -17,6 +19,8 @@ class SetableSignal(Signal):
|
|||||||
The signal will store the value in sim_state of the SimulatedData class of the parent device.
|
The signal will store the value in sim_state of the SimulatedData class of the parent device.
|
||||||
It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal,
|
It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal,
|
||||||
this signal can be written to.
|
this signal can be written to.
|
||||||
|
The setable signal inherits from the Signal class of ophyd, thus the class attribute needs to be
|
||||||
|
initiated as a Component (class from ophyd).
|
||||||
|
|
||||||
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
|
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
|
||||||
|
|
||||||
@ -46,20 +50,25 @@ class SetableSignal(Signal):
|
|||||||
)
|
)
|
||||||
self._value = value
|
self._value = value
|
||||||
self.precision = precision
|
self.precision = precision
|
||||||
self.sim = getattr(self.parent, "sim", self.parent)
|
self.sim = getattr(self.parent, "sim", None)
|
||||||
self._update_sim_state(value)
|
self._update_sim_state(value)
|
||||||
|
|
||||||
def _update_sim_state(self, value: any) -> None:
|
def _update_sim_state(self, value: any) -> None:
|
||||||
"""Update the readback value."""
|
"""Update the readback value."""
|
||||||
|
if self.sim:
|
||||||
self.sim.update_sim_state(self.name, value)
|
self.sim.update_sim_state(self.name, value)
|
||||||
|
|
||||||
def _get_value(self) -> any:
|
def _get_value(self) -> any:
|
||||||
"""Update the timestamp of the readback value."""
|
"""Update the timestamp of the readback value."""
|
||||||
|
if self.sim:
|
||||||
return self.sim.sim_state[self.name]["value"]
|
return self.sim.sim_state[self.name]["value"]
|
||||||
|
return self._value
|
||||||
|
|
||||||
def _get_timestamp(self) -> any:
|
def _get_timestamp(self) -> any:
|
||||||
"""Update the timestamp of the readback value."""
|
"""Update the timestamp of the readback value."""
|
||||||
|
if self.sim:
|
||||||
return self.sim.sim_state[self.name]["timestamp"]
|
return self.sim.sim_state[self.name]["timestamp"]
|
||||||
|
return time.time()
|
||||||
|
|
||||||
# pylint: disable=arguments-differ
|
# pylint: disable=arguments-differ
|
||||||
def get(self):
|
def get(self):
|
||||||
@ -100,6 +109,8 @@ class ReadOnlySignal(Signal):
|
|||||||
|
|
||||||
The readback will be computed from a function hosted in the SimulatedData class from the parent device
|
The readback will be computed from a function hosted in the SimulatedData class from the parent device
|
||||||
if compute_readback is True. Else, it will return the value stored int sim.sim_state directly.
|
if compute_readback is True. Else, it will return the value stored int sim.sim_state directly.
|
||||||
|
The readonly signal inherits from the Signal class of ophyd, thus the class attribute needs to be
|
||||||
|
initiated as a Component (class from ophyd).
|
||||||
|
|
||||||
>>> signal = ComputedReadOnlySignal(name="signal", parent=parent, value=0, compute_readback=True)
|
>>> signal = ComputedReadOnlySignal(name="signal", parent=parent, value=0, compute_readback=True)
|
||||||
|
|
||||||
@ -184,3 +195,132 @@ class ReadOnlySignal(Signal):
|
|||||||
if self.sim:
|
if self.sim:
|
||||||
return self._get_timestamp()
|
return self._get_timestamp()
|
||||||
return time.time()
|
return time.time()
|
||||||
|
|
||||||
|
|
||||||
|
class CustomSetableSignal(BECDeviceBase):
|
||||||
|
"""Custom signal for simulated devices. The custom signal can be read-only, setable or computed.
|
||||||
|
In comparison to above, this signal is not a class from ophyd, but an own implementation of a signal.
|
||||||
|
|
||||||
|
It works in the same fashion as the SetableSignal and ReadOnlySignal, however, it is
|
||||||
|
not needed to initiate it as a Component (ophyd) within the parent device class.
|
||||||
|
|
||||||
|
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
|
||||||
|
name (string) : Name of the signal
|
||||||
|
parent (object) : Parent object of the signal, default none.
|
||||||
|
value (any) : Initial value of the signal, default 0.
|
||||||
|
kind (int) : Kind of the signal, default Kind.normal.
|
||||||
|
precision (float) : Precision of the signal, default PRECISION.
|
||||||
|
"""
|
||||||
|
|
||||||
|
USER_ACCESS = ["put", "get", "set"]
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
*args,
|
||||||
|
parent=None,
|
||||||
|
value: any = 0,
|
||||||
|
kind: int = Kind.normal,
|
||||||
|
precision: float = PRECISION,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
if parent:
|
||||||
|
name = f"{parent.name}_{name}"
|
||||||
|
super().__init__(*args, name=name, parent=parent, kind=kind, **kwargs)
|
||||||
|
self._metadata = {"connected": self.connected, "write_access": True}
|
||||||
|
self._value = value
|
||||||
|
self._timestamp = time.time()
|
||||||
|
self._dtype = type(value)
|
||||||
|
self._shape = self._get_shape(value)
|
||||||
|
self.precision = precision
|
||||||
|
self.sim = getattr(self.parent, "sim", None)
|
||||||
|
self._update_sim_state(value)
|
||||||
|
|
||||||
|
def _get_shape(self, value: any) -> list:
|
||||||
|
"""Get the shape of the value.
|
||||||
|
**Note: This logic is from ophyd, and replicated here.
|
||||||
|
There would be more sophisticated ways, but to keep it consistent, it is replicated here.**
|
||||||
|
"""
|
||||||
|
if isinstance(value, np.ndarray):
|
||||||
|
return list(value.shape)
|
||||||
|
if isinstance(value, list):
|
||||||
|
return len(value)
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _update_sim_state(self, value: any) -> None:
|
||||||
|
"""Update the readback value."""
|
||||||
|
if self.sim:
|
||||||
|
self.sim.update_sim_state(self.name, value)
|
||||||
|
|
||||||
|
def _get_value(self) -> any:
|
||||||
|
"""Update the timestamp of the readback value."""
|
||||||
|
if self.sim:
|
||||||
|
return self.sim.sim_state[self.name]["value"]
|
||||||
|
return self._value
|
||||||
|
|
||||||
|
def _get_timestamp(self) -> any:
|
||||||
|
"""Update the timestamp of the readback value."""
|
||||||
|
if self.sim:
|
||||||
|
return self.sim.sim_state[self.name]["timestamp"]
|
||||||
|
return self._timestamp
|
||||||
|
|
||||||
|
# pylint: disable=arguments-differ
|
||||||
|
def get(self):
|
||||||
|
"""Get the current position of the simulated device.
|
||||||
|
|
||||||
|
Core function for signal.
|
||||||
|
"""
|
||||||
|
self._value = self._get_value()
|
||||||
|
return self._value
|
||||||
|
|
||||||
|
# pylint: disable=arguments-differ
|
||||||
|
def put(self, value):
|
||||||
|
"""Put the value to the simulated device.
|
||||||
|
|
||||||
|
Core function for signal.
|
||||||
|
"""
|
||||||
|
self._update_sim_state(value)
|
||||||
|
self._value = value
|
||||||
|
self._timestamp = time.time()
|
||||||
|
|
||||||
|
def describe(self):
|
||||||
|
"""Describe the readback signal.
|
||||||
|
|
||||||
|
Core function for signal.
|
||||||
|
"""
|
||||||
|
res = {
|
||||||
|
self.name: {
|
||||||
|
"source": str(self.__class__),
|
||||||
|
"dtype": self._dtype,
|
||||||
|
"shape": self._shape,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if self.precision is not None:
|
||||||
|
res[self.name]["precision"] = self.precision
|
||||||
|
return res
|
||||||
|
|
||||||
|
def set(self, value):
|
||||||
|
"""Set method"""
|
||||||
|
self.put(value)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def timestamp(self):
|
||||||
|
"""Timestamp of the readback value"""
|
||||||
|
return self._get_timestamp()
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
"""Read method"""
|
||||||
|
return {
|
||||||
|
self.name: {
|
||||||
|
"value": self.get(),
|
||||||
|
"timestamp": self.timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def read_configuration(self):
|
||||||
|
"""Read method"""
|
||||||
|
return self.read()
|
||||||
|
Reference in New Issue
Block a user