diff --git a/ophyd_devices/interfaces/base_classes/psi_detector_base.py b/ophyd_devices/interfaces/base_classes/psi_detector_base.py index 9cc8f20..24a06ae 100644 --- a/ophyd_devices/interfaces/base_classes/psi_detector_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_detector_base.py @@ -400,8 +400,6 @@ class PSIDetectorBase(Device): list(object): list of objects that were unstaged """ self.check_scan_id() - if self.stopped is True: - return super().unstage() self.custom_prepare.on_unstage() self.stopped = False return super().unstage() diff --git a/ophyd_devices/sim/sim_camera.py b/ophyd_devices/sim/sim_camera.py index b098111..de152b2 100644 --- a/ophyd_devices/sim/sim_camera.py +++ b/ophyd_devices/sim/sim_camera.py @@ -1,8 +1,10 @@ +import traceback +from threading import Thread + import numpy as np from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import DeviceStatus, Kind -from ophyd.status import StatusBase from ophyd_devices.interfaces.base_classes.psi_detector_base import ( CustomDetectorMixin, @@ -18,6 +20,11 @@ logger = bec_logger.logger class SimCameraSetup(CustomDetectorMixin): """Mixin class for the SimCamera device.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._thread_trigger = None + self._thread_complete = None + def on_trigger(self) -> None: """Trigger the camera to acquire images. @@ -26,16 +33,33 @@ class SimCameraSetup(CustomDetectorMixin): Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC. """ - try: - for _ in range(self.parent.burst.get()): - data = self.parent.image.get() - self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data) - if self.parent.stopped: - break - if self.parent.write_to_disk.get(): - self.parent.h5_writer.receive_data(data) - finally: - self.parent.stopped = False + status = DeviceStatus(self.parent) + + def on_trigger_call(status: DeviceStatus) -> None: + success = True + try: + for _ in range(self.parent.burst.get()): + data = self.parent.image.get() + # pylint: disable=protected-access + self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data) + if self.parent.stopped: + success = False + break + if self.parent.write_to_disk.get(): + self.parent.h5_writer.receive_data(data) + # pylint: disable=protected-access + status._finished(success=success) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in on_trigger_call in device {self.parent.name}. Error traceback: {content}" + ) + status.set_exception(exc) + + self._thread_trigger = Thread(target=on_trigger_call, args=(status,)) + self._thread_trigger.start() + return status def on_stage(self) -> None: """Stage the camera for upcoming scan @@ -63,13 +87,41 @@ class SimCameraSetup(CustomDetectorMixin): self.publish_file_location(done=False, successful=False) self.parent.stopped = False - def on_unstage(self) -> None: - """Unstage the device + def on_complete(self) -> None: + """Complete the motion of the simulated device.""" + status = DeviceStatus(self.parent) - Send reads from all config signals to redis - """ - if self.parent.write_to_disk.get(): - self.publish_file_location(done=True, successful=True) + def on_complete_call(status: DeviceStatus) -> None: + success = True + try: + if self.parent.write_to_disk.get(): + self.parent.h5_writer.write_data() + self.publish_file_location(done=True, successful=True) + # pylint: disable=protected-access + if self.parent.stopped: + success = False + # pylint: disable=protected-access + status._finished(success=success) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in on_complete call in device {self.parent.name}. Error traceback: {content}" + ) + status.set_exception(exc) + + self._thread_complete = Thread(target=on_complete_call, args=(status,), daemon=True) + self._thread_complete.start() + return status + + def on_stop(self) -> None: + """Stop the camera acquisition.""" + if self._thread_trigger: + self._thread_trigger.join() + if self._thread_complete: + self._thread_complete.join() + self._thread_trigger = None + self._thread_complete = None class SimCamera(PSIDetectorBase): @@ -133,11 +185,3 @@ class SimCamera(PSIDetectorBase): def registered_proxies(self) -> None: """Dictionary of registered signal_names and proxies.""" return self._registered_proxies - - def complete(self) -> StatusBase: - """Complete the motion of the simulated device.""" - status = DeviceStatus(self) - if self.write_to_disk.get(): - self.h5_writer.write_data() - status.set_finished() - return status diff --git a/ophyd_devices/sim/sim_monitor.py b/ophyd_devices/sim/sim_monitor.py index f26c5bc..3968280 100644 --- a/ophyd_devices/sim/sim_monitor.py +++ b/ophyd_devices/sim/sim_monitor.py @@ -1,9 +1,11 @@ +from threading import Thread + import numpy as np from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import Device, Kind +from ophyd import Device, DeviceStatus, Kind from ophyd_devices.interfaces.base_classes.psi_detector_base import ( CustomDetectorMixin, @@ -82,6 +84,8 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin): self._stream_ttl = 1800 self._random_send_interval = None self._counter = 0 + self._thread_trigger = None + self._thread_complete = None self.prep_random_interval() self.parent.current_trigger.subscribe(self._progress_update, run=False) @@ -103,8 +107,25 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin): def on_complete(self): """Prepare the device for completion.""" - if self.parent.data_buffer["value"]: - self._send_data_to_bec() + status = DeviceStatus(self.parent) + + def on_complete_call(status: DeviceStatus) -> None: + exception = None + try: + if self.parent.data_buffer["value"]: + self._send_data_to_bec() + # pylint: disable=broad-except + except Exception as exc: + exception = exc + finally: + if exception: + status.set_exception(exception) + else: + status.set_finished() + + self._thread_complete = Thread(target=on_complete_call, args=(status,)) + self._thread_complete.start() + return status def _send_data_to_bec(self) -> None: """Sends bundled data to BEC""" @@ -128,16 +149,34 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin): def on_trigger(self): """Prepare the device for triggering.""" - self.parent.data_buffer["value"].append(self.parent.readback.get()) - self.parent.data_buffer["timestamp"].append(self.parent.readback.timestamp) - self._counter += 1 - self.parent.current_trigger.set(self._counter).wait() - if self._counter % self._random_send_interval == 0: - self._send_data_to_bec() + status = DeviceStatus(self.parent) + + def on_trigger_call(status: DeviceStatus) -> None: + exception = None + try: + self.parent.data_buffer["value"].append(self.parent.readback.get()) + self.parent.data_buffer["timestamp"].append(self.parent.readback.timestamp) + self._counter += 1 + self.parent.current_trigger.set(self._counter).wait() + if self._counter % self._random_send_interval == 0: + self._send_data_to_bec() + # pylint: disable=broad-except + except Exception as exc: + exception = exc + finally: + if exception: + status.set_exception(exception) + else: + status.set_finished() + + self._thread_trigger = Thread(target=on_trigger_call, args=(status,)) + self._thread_trigger.start() + return status def _progress_update(self, value: int, **kwargs): """Update the progress of the device.""" max_value = self.parent.scaninfo.num_points + # pylint: disable=protected-access self.parent._run_subs( sub_type=self.parent.SUB_PROGRESS, value=value, @@ -145,6 +184,15 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin): done=bool(max_value == value), ) + def on_stop(self): + """Stop the device.""" + if self._thread_trigger: + self._thread_trigger.join() + if self._thread_complete: + self._thread_complete.join() + self._thread_trigger = None + self._thread_complete = None + class SimMonitorAsync(PSIDetectorBase): """