From 87858edfe290cb711bc30c2f3ba2653460d15af6 Mon Sep 17 00:00:00 2001 From: appel_c Date: Sun, 28 Jul 2024 11:20:12 +0200 Subject: [PATCH] refactor: review DeviceStatus and error handling in simulation --- .../base_classes/psi_detector_base.py | 23 +++++++++-- ophyd_devices/sim/sim_camera.py | 18 ++++----- ophyd_devices/sim/sim_monitor.py | 38 +++++++++++-------- ophyd_devices/sim/sim_positioner.py | 35 +++++++++++++---- ophyd_devices/sim/sim_waveform.py | 37 ++++++++++++------ ophyd_devices/utils/errors.py | 9 +++++ 6 files changed, 113 insertions(+), 47 deletions(-) create mode 100644 ophyd_devices/utils/errors.py diff --git a/ophyd_devices/interfaces/base_classes/psi_detector_base.py b/ophyd_devices/interfaces/base_classes/psi_detector_base.py index 24a06ae..8c3b6e0 100644 --- a/ophyd_devices/interfaces/base_classes/psi_detector_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_detector_base.py @@ -7,16 +7,21 @@ The beamlines need to inherit from the CustomDetectorMixing for their mixin clas import os import threading import time +import traceback from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.file_utils import FileWriter +from bec_lib.logger import bec_logger from ophyd import Component, Device, DeviceStatus, Kind from ophyd.device import Staged from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.utils import bec_utils from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin +from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError + +logger = bec_logger.logger class DetectorInitError(Exception): @@ -176,7 +181,7 @@ class CustomDetectorMixin: check_stopped: bool = False, interval: float = 0.05, all_signals: bool = False, - exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"), + exception_on_timeout: Exception = None, ) -> DeviceStatus: """Utility function to wait for signals in a thread. Returns a DevicesStatus object that resolves either to set_finished or set_exception. @@ -200,6 +205,10 @@ class CustomDetectorMixin: Returns: DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception """ + if exception_on_timeout is None: + exception_on_timeout = DeviceTimeoutError( + f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}" + ) status = DeviceStatus(self.parent) @@ -211,7 +220,7 @@ class CustomDetectorMixin: check_stopped: bool, interval: float, all_signals: bool, - exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"), + exception_on_timeout: Exception, ): """Convenient wrapper around wait_for_signals to set status based on the result. @@ -231,8 +240,16 @@ class CustomDetectorMixin: if result: status.set_finished() else: - status.set_exception(exception_on_timeout) + if self.stopped: + status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped")) + else: + status.set_exception(exc=exception_on_timeout) + # pylint: disable=broad-except except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}" + ) status.set_exception(exc=exc) thread = threading.Thread( diff --git a/ophyd_devices/sim/sim_camera.py b/ophyd_devices/sim/sim_camera.py index de152b2..9945759 100644 --- a/ophyd_devices/sim/sim_camera.py +++ b/ophyd_devices/sim/sim_camera.py @@ -13,6 +13,7 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import ( from ophyd_devices.sim.sim_data import SimulatedDataCamera from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_utils import H5Writer +from ophyd_devices.utils.errors import DeviceStopError logger = bec_logger.logger @@ -36,19 +37,19 @@ class SimCameraSetup(CustomDetectorMixin): status = DeviceStatus(self.parent) def on_trigger_call(status: DeviceStatus) -> None: - success = True + error = None try: for _ in range(self.parent.burst.get()): data = self.parent.image.get() # pylint: disable=protected-access self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data) if self.parent.stopped: - success = False + error = DeviceStopError(f"{self.parent.name} was stopped") break if self.parent.write_to_disk.get(): self.parent.h5_writer.receive_data(data) - # pylint: disable=protected-access - status._finished(success=success) + # pylint: disable=expression-not-assigned + status.set_finished() if not error else status.set_exception(exc=error) # pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() @@ -92,16 +93,15 @@ class SimCameraSetup(CustomDetectorMixin): status = DeviceStatus(self.parent) def on_complete_call(status: DeviceStatus) -> None: - success = True + error = None try: if self.parent.write_to_disk.get(): self.parent.h5_writer.write_data() self.publish_file_location(done=True, successful=True) - # pylint: disable=protected-access if self.parent.stopped: - success = False - # pylint: disable=protected-access - status._finished(success=success) + error = DeviceStopError(f"{self.parent.name} was stopped") + # pylint: disable=expression-not-assigned + status.set_finished() if not error else status.set_exception(exc=error) # pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() diff --git a/ophyd_devices/sim/sim_monitor.py b/ophyd_devices/sim/sim_monitor.py index 3968280..ea2146f 100644 --- a/ophyd_devices/sim/sim_monitor.py +++ b/ophyd_devices/sim/sim_monitor.py @@ -1,3 +1,6 @@ +"""Module for simulated monitor devices.""" + +import traceback from threading import Thread import numpy as np @@ -13,6 +16,7 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import ( ) from ophyd_devices.sim.sim_data import SimulatedDataMonitor from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal +from ophyd_devices.utils.errors import DeviceStopError logger = bec_logger.logger @@ -110,18 +114,19 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin): status = DeviceStatus(self.parent) def on_complete_call(status: DeviceStatus) -> None: - exception = None + error = None try: if self.parent.data_buffer["value"]: self._send_data_to_bec() + if self.parent.stopped: + error = DeviceStopError(f"{self.parent.name} was stopped") + # pylint: disable=expression-not-assigned + status.set_finished() if not error else status.set_exception(exc=error) # pylint: disable=broad-except except Exception as exc: - exception = exc - finally: - if exception: - status.set_exception(exception) - else: - status.set_finished() + content = traceback.format_exc() + status.set_exception(exc=exc) + logger.warning(f"Error in {self.parent.name} on_complete; Traceback: {content}") self._thread_complete = Thread(target=on_complete_call, args=(status,)) self._thread_complete.start() @@ -152,7 +157,7 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin): status = DeviceStatus(self.parent) def on_trigger_call(status: DeviceStatus) -> None: - exception = None + error = None try: self.parent.data_buffer["value"].append(self.parent.readback.get()) self.parent.data_buffer["timestamp"].append(self.parent.readback.timestamp) @@ -160,14 +165,17 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin): self.parent.current_trigger.set(self._counter).wait() if self._counter % self._random_send_interval == 0: self._send_data_to_bec() + if self.parent.stopped: + error = DeviceStopError(f"{self.parent.name} was stopped") + # pylint: disable=expression-not-assigned + status.set_finished() if not error else status.set_exception(exc=error) # pylint: disable=broad-except except Exception as exc: - exception = exc - finally: - if exception: - status.set_exception(exception) - else: - status.set_finished() + content = traceback.format_exc() + logger.warning( + f"Error in on_trigger_call in device {self.parent.name}; Traceback: {content}" + ) + status.set_exception(exc=exc) self._thread_trigger = Thread(target=on_trigger_call, args=(status,)) self._thread_trigger.start() @@ -199,7 +207,7 @@ class SimMonitorAsync(PSIDetectorBase): A simulated device to mimic the behaviour of an asynchronous monitor. During a scan, this device will send data not in sync with the point ID to BEC, - but buffer data and send it in random intervals. + but buffer data and send it in random intervals.s """ USER_ACCESS = ["sim", "registered_proxies", "async_update"] diff --git a/ophyd_devices/sim/sim_positioner.py b/ophyd_devices/sim/sim_positioner.py index 32c3ac4..c3cec98 100644 --- a/ophyd_devices/sim/sim_positioner.py +++ b/ophyd_devices/sim/sim_positioner.py @@ -1,5 +1,8 @@ +""" Module for simulated positioner devices. """ + import threading import time as ttime +import traceback import numpy as np from bec_lib.logger import bec_logger @@ -12,6 +15,7 @@ from ophyd_devices.sim.sim_data import SimulatedPositioner from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_test_devices import DummyController from ophyd_devices.sim.sim_utils import LinearTrajectory, stop_trajectory +from ophyd_devices.utils.errors import DeviceStopError logger = bec_logger.logger @@ -155,7 +159,7 @@ class SimPositioner(Device, PositionerBase): def _move_and_finish(self, start_pos, stop_pos, st): """Move the simulated device and finish the motion.""" - success = True + error = None target = stop_pos + self.tolerance.get() * np.random.uniform(-1, 1) @@ -166,14 +170,21 @@ class SimPositioner(Device, PositionerBase): ttime.sleep(1 / self.update_frequency) self._update_state(ii) if self._stopped: - success = False + error = DeviceStopError(f"{self.name} was stopped") break else: self._update_state(target) + # pylint: disable=expression-not-assigned + st.set_finished() if error is None else st.set_exception(error) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in on_complete call in device {self.name}. Error traceback: {content}" + ) + st.set_exception(exc=exc) finally: - self._done_moving(success=success) self._set_sim_state(self.motor_is_moving.name, 0) - st.set_finished() def move(self, value: float, **kwargs) -> DeviceStatus: """Change the setpoint of the simulated device, and simultaneously initiate a motion.""" @@ -201,6 +212,7 @@ class SimPositioner(Device, PositionerBase): self._stopped = True if self.move_thread: self.move_thread.join() + self.move_thread = None super().stop(success=success) @property @@ -219,7 +231,7 @@ class SimLinearTrajectoryPositioner(SimPositioner): super().__init__(*args, **kwargs) def _move_and_finish(self, start_pos, end_pos, st): - success = True + error = None acc_time = ( self.acceleration.get() ) # acceleration in Ophyd refers to acceleration time in seconds @@ -232,7 +244,7 @@ class SimLinearTrajectoryPositioner(SimPositioner): ttime.sleep(1 / self.update_frequency) self._update_state(traj.position()) if self._stopped: - success = False + error = DeviceStopError(f"{self.name} was stopped") break if self._stopped: # simulate deceleration @@ -241,10 +253,17 @@ class SimLinearTrajectoryPositioner(SimPositioner): ttime.sleep(1 / self.update_frequency) self._update_state(traj.position()) self._update_state(traj.position()) + # pylint: disable=expression-not-assigned + st.set_finished() if error is None else st.set_exception(error) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in on_complete call in device {self.name}. Error traceback: {content}" + ) + st.set_exception(exc=exc) finally: self._set_sim_state(self.motor_is_moving.name, 0) - self._done_moving(success=success) - st.set_finished() class SimPositionerWithCommFailure(SimPositioner): diff --git a/ophyd_devices/sim/sim_waveform.py b/ophyd_devices/sim/sim_waveform.py index a859e50..4b355fc 100644 --- a/ophyd_devices/sim/sim_waveform.py +++ b/ophyd_devices/sim/sim_waveform.py @@ -1,5 +1,8 @@ +"""Module for a simulated 1D Waveform detector, i.e. a Falcon XRF detector.""" + import os import threading +import traceback import numpy as np from bec_lib.logger import bec_logger @@ -9,6 +12,7 @@ from ophyd import Device, DeviceStatus, Kind from ophyd_devices.sim.sim_data import SimulatedDataWaveform from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin +from ophyd_devices.utils.errors import DeviceStopError logger = bec_logger.logger @@ -65,9 +69,10 @@ class SimWaveform(Device): self.sim = self.sim_cls(parent=self, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs) - self._stopped = False + self.stopped = False self._staged = False self.scaninfo = None + self._trigger_thread = None self._update_scaninfo() if self.sim_init: self.sim.set_init(self.sim_init) @@ -87,19 +92,24 @@ class SimWaveform(Device): """ status = DeviceStatus(self) - self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False) - - def acquire(): + def acquire(status: DeviceStatus): + error = None try: for _ in range(self.burst.get()): self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get()) - if self._stopped: + if self.stopped: + error = DeviceStopError(f"{self.name} was stopped") break - finally: - self._stopped = False - self._done_acquiring() + # pylint: disable=expression-not-assigned + status.set_finished() if not error else status.set_exception(exc=error) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + status.set_exception(exc=exc) + logger.warning(f"Error in {self.name} trigger; Traceback: {content}") - threading.Thread(target=acquire, daemon=True).start() + self._trigger_thread = threading.Thread(target=acquire, args=(status,), daemon=True) + self._trigger_thread.start() return status def _update_scaninfo(self) -> None: @@ -129,7 +139,7 @@ class SimWaveform(Device): self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) self.exp_time.set(self.scaninfo.exp_time) self.burst.set(self.scaninfo.frames_per_trigger) - self._stopped = False + self.stopped = False return super().stage() def unstage(self) -> list[object]: @@ -137,12 +147,15 @@ class SimWaveform(Device): Send reads from all config signals to redis """ - if self._stopped is True or not self._staged: + if self.stopped is True or not self._staged: return super().unstage() return super().unstage() def stop(self, *, success=False): """Stop the device""" - self._stopped = True + self.stopped = True + if self._trigger_thread: + self._trigger_thread.join() + self._trigger_thread = None super().stop(success=success) diff --git a/ophyd_devices/utils/errors.py b/ophyd_devices/utils/errors.py new file mode 100644 index 0000000..d1ee140 --- /dev/null +++ b/ophyd_devices/utils/errors.py @@ -0,0 +1,9 @@ +"""Module for ophyd_devices specific errors. """ + + +class DeviceStopError(Exception): + """Error to raise if the device is stopped.""" + + +class DeviceTimeoutError(Exception): + """Error to raise if the device times out."""