diff --git a/ophyd_devices/interfaces/base_classes/psi_detector_base.py b/ophyd_devices/interfaces/base_classes/psi_detector_base.py index 6f643db..fe637c7 100644 --- a/ophyd_devices/interfaces/base_classes/psi_detector_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_detector_base.py @@ -8,10 +8,9 @@ import os import time from bec_lib import messages -from bec_lib.device import DeviceStatus from bec_lib.endpoints import MessageEndpoints from bec_lib.file_utils import FileWriter -from ophyd import Device +from ophyd import Device, DeviceStatus from ophyd.device import Staged from ophyd_devices.utils import bec_utils @@ -88,6 +87,13 @@ class CustomDetectorMixin: Only use if needed, and it is recommended to keep this function as short/fast as possible. """ + def on_complete(self) -> None: + """ + Specify actions to be executed when the scan is complete. + + This can for instance be to check with the detector and backend if all data is written succsessfully. + """ + # TODO add configurable file_path instead of hardcoding self.parent.filepath def publish_file_location( self, done: bool = False, successful: bool = None, metadata: dict = {} @@ -274,6 +280,19 @@ class PSIDetectorBase(Device): self.custom_prepare.on_trigger() return super().trigger() + def complete(self) -> None: + """Complete the acquisition, called from BEC. + + This function is called after the scan is complete, just before unstage. + We can check here with the data backend and detector if the acquisition successfully finished. + + Actions are implemented in custom_prepare.on_complete since they are beamline specific. + """ + status = DeviceStatus(self) + self.custom_prepare.on_complete() + status.set_finished() + return status + def unstage(self) -> list[object]: """ Unstage device after a scan. diff --git a/ophyd_devices/sim/sim_monitor_async.py b/ophyd_devices/sim/sim_monitor_async.py new file mode 100644 index 0000000..b176e7a --- /dev/null +++ b/ophyd_devices/sim/sim_monitor_async.py @@ -0,0 +1,152 @@ +"""This module provides an asynchronous monitor to simulate the behaviour of a device sending data not in sync with the point ID.""" + +from typing import Literal + +import numpy as np +from bec_lib import messages +from bec_lib.endpoints import MessageEndpoints +from ophyd import Component as Cpt +from ophyd import Device, Kind +from typeguard import typechecked + +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) +from ophyd_devices.sim.sim_data import SimulatedDataMonitor +from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal + + +class SimMonitorAsyncPrepare(CustomDetectorMixin): + """Custom prepare for the SimMonitorAsync class.""" + + def __init__(self, *args, parent: Device = None, **kwargs) -> None: + super().__init__(*args, parent=parent, **kwargs) + self._stream_ttl = 1800 + self._random_send_interval = None + self._counter = 0 + self.prep_random_interval() + + def clear_buffer(self): + """Clear the data buffer.""" + self.parent.data_buffer["value"].clear() + self.parent.data_buffer["timestamp"].clear() + + def prep_random_interval(self): + """Prepare counter and random interval to send data to BEC.""" + self._random_send_interval = np.random.randint(1, 10) + self.parent.current_trigger.set(0) + self._counter = self.parent.current_trigger.get() + + def on_stage(self): + """Prepare the device for staging.""" + self.clear_buffer() + self.prep_random_interval() + self.parent.current_trigger.subscribe(self._progress_update, run=False) + + def on_complete(self): + """Prepare the device for completion.""" + if self.parent.data_buffer["value"]: + self._send_data_to_bec() + + def _send_data_to_bec(self) -> None: + """Sends bundled data to BEC""" + if self.parent.scaninfo.scan_msg is None: + return + metadata = self.parent.scaninfo.scan_msg.metadata + metadata.update({"async_update": "extend"}) + + msg = messages.DeviceMessage( + signals={self.parent.readback.name: self.parent.data_buffer}, + metadata=self.parent.scaninfo.scan_msg.metadata, + ) + self.parent.connector.xadd( + MessageEndpoints.device_async_readback( + scan_id=self.parent.scaninfo.scan_id, device=self.parent.name + ), + {"data": msg}, + expire=self._stream_ttl, + ) + self.clear_buffer() + + def on_trigger(self): + """Prepare the device for triggering.""" + self.parent.data_buffer["value"].append(self.parent.readback.get()) + self.parent.data_buffer["value"].append(self.parent.readback.timestamp) + self._counter += 1 + self.parent.current_trigger.set(self._counter) + if self._counter % self._random_send_interval == 0: + self._send_data_to_bec() + + def _progress_update(self, value: int): + """Update the progress of the device.""" + max_value = self.parent.scaninfo.num_points + self.parent._run_subs( + sub_type=self.parent.SUB_PROGRESS, + value=value, + max_value=max_value, + done=bool(max_value == value), + ) + + +class SimMonitorAsync(PSIDetectorBase): + """ + A simulated device to mimic the behaviour of an asynchronous monitor. + + During a scan, this device will send data not in sync with the point ID to BEC, + but buffer data and send it in random intervals. + """ + + USER_ACCESS = ["sim", "registered_proxies", "async_update"] + + custom_prepare_cls = SimMonitorAsyncPrepare + sim_cls = SimulatedDataMonitor + BIT_DEPTH = np.uint32 + + readback = Cpt(ReadOnlySignal, value=BIT_DEPTH(0), kind=Kind.hinted, compute_readback=True) + current_trigger = Cpt(SetableSignal, value=BIT_DEPTH(0), kind=Kind.config) + + SUB_READBACK = "readback" + SUB_PROGRESS = "progress" + _default_sub = SUB_READBACK + + def __init__( + self, name, *, sim_init: dict = None, parent=None, kind=None, device_manager=None, **kwargs + ): + self.init_sim_params = sim_init + self.device_manager = device_manager + self.sim = self.sim_cls(parent=self, **kwargs) + self._registered_proxies = {} + + super().__init__( + name=name, parent=parent, kind=kind, device_manager=device_manager, **kwargs + ) + self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) + self.readback.name = self.name + self._data_buffer = {"value": [], "timestamp": []} + self._async_update = "extend" + + @property + def data_buffer(self) -> list: + """Buffer for data to be sent asynchronously.""" + return self._data_buffer + + @property + def registered_proxies(self) -> None: + """Dictionary of registered signal_names and proxies.""" + return self._registered_proxies + + @property + def async_update(self) -> str: + """Update method for the asynchronous monitor.""" + return self._async_update + + @async_update.setter + @typechecked + def async_update(self, value: Literal["extend", "append"]) -> None: + """Set the update method for the asynchronous monitor. + + Args: + value (str): Can only be "extend" or "append". + """ + self._async_update = value diff --git a/ophyd_devices/utils/bec_utils.py b/ophyd_devices/utils/bec_utils.py index ae39478..825a512 100644 --- a/ophyd_devices/utils/bec_utils.py +++ b/ophyd_devices/utils/bec_utils.py @@ -2,6 +2,7 @@ import time from bec_lib import bec_logger from bec_lib.devicemanager import DeviceContainer +from bec_lib.tests.utils import ConnectorMock from ophyd import Device, Kind, Signal from ophyd_devices.utils.socket import data_shape, data_type @@ -50,93 +51,6 @@ class DeviceMock: return self -class ConnectorMock: - def __init__(self, store_data=True) -> None: - self.message_sent = [] - self._get_buffer = {} - self.store_data = store_data - - def set(self, topic, msg, pipe=None, expire: int = None): - if pipe: - pipe._pipe_buffer.append(("set", (topic, msg), {"expire": expire})) - return - self.message_sent.append({"queue": topic, "msg": msg, "expire": expire}) - - def send(self, topic, msg, pipe=None): - if pipe: - pipe._pipe_buffer.append(("send", (topic, msg), {})) - return - self.message_sent.append({"queue": topic, "msg": msg}) - - def set_and_publish(self, topic, msg, pipe=None, expire: int = None): - if pipe: - pipe._pipe_buffer.append(("set_and_publish", (topic, msg), {"expire": expire})) - return - self.message_sent.append({"queue": topic, "msg": msg, "expire": expire}) - - def lpush(self, topic, msg, pipe=None): - if pipe: - pipe._pipe_buffer.append(("lpush", (topic, msg), {})) - return - - def rpush(self, topic, msg, pipe=None): - if pipe: - pipe._pipe_buffer.append(("rpush", (topic, msg), {})) - return - pass - - def lrange(self, topic, start, stop, pipe=None): - if pipe: - pipe._pipe_buffer.append(("lrange", (topic, start, stop), {})) - return - return [] - - def get(self, topic, pipe=None): - if pipe: - pipe._pipe_buffer.append(("get", (topic,), {})) - return - val = self._get_buffer.get(topic) - if isinstance(val, list): - return val.pop(0) - self._get_buffer.pop(topic, None) - return val - - def keys(self, pattern: str) -> list: - return [] - - def pipeline(self): - return PipelineMock(self) - - def delete(self, topic, pipe=None): - if pipe: - pipe._pipe_buffer.append(("delete", (topic,), {})) - return - - def lset(self, topic: str, index: int, msgs: str, pipe=None) -> None: - if pipe: - pipe._pipe_buffer.append(("lrange", (topic, index, msgs), {})) - return - - -class PipelineMock: - _pipe_buffer = [] - _connector = None - - def __init__(self, connector) -> None: - self._connector = connector - - def execute(self): - if not self._connector.store_data: - self._pipe_buffer = [] - return [] - res = [ - getattr(self._connector, method)(*args, **kwargs) - for method, args, kwargs in self._pipe_buffer - ] - self._pipe_buffer = [] - return res - - class DMMock: """Mock for DeviceManager