diff --git a/ophyd_devices/utils/bec_utils.py b/ophyd_devices/utils/bec_utils.py index 02a8b4f..304a2ed 100644 --- a/ophyd_devices/utils/bec_utils.py +++ b/ophyd_devices/utils/bec_utils.py @@ -2,8 +2,13 @@ import time from bec_lib.core import bec_logger +from ophyd import Signal, Kind + +from ophyd_devices.utils.socket import data_shape, data_type + logger = bec_logger.logger +DEFAULT_EPICSSIGNAL_VALUE = object() class MockProducer: @@ -33,3 +38,107 @@ class mokev: def read(self): return {self.name: {"value": 16.0, "timestamp": time.time()}} + + +class ConfigSignal(Signal): + def __init__( + self, + *, + name, + value=0, + timestamp=None, + parent=None, + labels=None, + kind=Kind.hinted, + tolerance=None, + rtolerance=None, + metadata=None, + cl=None, + attr_name="", + config_storage_name: str = "config_storage", + ): + super().__init__( + name=name, + value=value, + timestamp=timestamp, + parent=parent, + labels=labels, + kind=kind, + tolerance=tolerance, + rtolerance=rtolerance, + metadata=metadata, + cl=cl, + attr_name=attr_name, + ) + + self.storage_name = config_storage_name + + def get(self): + self._readback = getattr(self.parent, self.storage_name)[self.name] + return self._readback + + def put( + self, + value, + connection_timeout=1, + callback=None, + timeout=1, + **kwargs, + ): + """Using channel access, set the write PV to `value`. + + Keyword arguments are passed on to callbacks + + Parameters + ---------- + value : any + The value to set + connection_timeout : float, optional + If not already connected, allow up to `connection_timeout` seconds + for the connection to complete. + use_complete : bool, optional + Override put completion settings + callback : callable + Callback for when the put has completed + timeout : float, optional + Timeout before assuming that put has failed. (Only relevant if + put completion is used.) + """ + + old_value = self.get() + timestamp = time.time() + getattr(self.parent, self.storage_name)[self.name] = value + super().put(value, timestamp=timestamp, force=True) + self._run_subs( + sub_type=self.SUB_VALUE, + old_value=old_value, + value=value, + timestamp=timestamp, + ) + + def describe(self): + """Provide schema and meta-data for :meth:`~BlueskyInterface.read` + + This keys in the `OrderedDict` this method returns must match the + keys in the `OrderedDict` return by :meth:`~BlueskyInterface.read`. + + This provides schema related information, (ex shape, dtype), the + source (ex PV name), and if available, units, limits, precision etc. + + Returns + ------- + data_keys : OrderedDict + The keys must be strings and the values must be dict-like + with the ``event_model.event_descriptor.data_key`` schema. + """ + if self._readback is DEFAULT_EPICSSIGNAL_VALUE: + val = self.get() + else: + val = self._readback + return { + self.name: { + "source": f"{self.parent.prefix}:{self.name}", + "dtype": data_type(val), + "shape": data_shape(val), + } + }