refactor: review DeviceStatus and error handling in simulation

This commit is contained in:
appel_c 2024-07-28 11:20:12 +02:00
parent c3e17ba056
commit 87858edfe2
6 changed files with 113 additions and 47 deletions

View File

@ -7,16 +7,21 @@ The beamlines need to inherit from the CustomDetectorMixing for their mixin clas
import os import os
import threading import threading
import time import time
import traceback
from bec_lib import messages from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter from bec_lib.file_utils import FileWriter
from bec_lib.logger import bec_logger
from ophyd import Component, Device, DeviceStatus, Kind from ophyd import Component, Device, DeviceStatus, Kind
from ophyd.device import Staged from ophyd.device import Staged
from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils import bec_utils from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
logger = bec_logger.logger
class DetectorInitError(Exception): class DetectorInitError(Exception):
@ -176,7 +181,7 @@ class CustomDetectorMixin:
check_stopped: bool = False, check_stopped: bool = False,
interval: float = 0.05, interval: float = 0.05,
all_signals: bool = False, all_signals: bool = False,
exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"), exception_on_timeout: Exception = None,
) -> DeviceStatus: ) -> DeviceStatus:
"""Utility function to wait for signals in a thread. """Utility function to wait for signals in a thread.
Returns a DevicesStatus object that resolves either to set_finished or set_exception. Returns a DevicesStatus object that resolves either to set_finished or set_exception.
@ -200,6 +205,10 @@ class CustomDetectorMixin:
Returns: Returns:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
""" """
if exception_on_timeout is None:
exception_on_timeout = DeviceTimeoutError(
f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}"
)
status = DeviceStatus(self.parent) status = DeviceStatus(self.parent)
@ -211,7 +220,7 @@ class CustomDetectorMixin:
check_stopped: bool, check_stopped: bool,
interval: float, interval: float,
all_signals: bool, all_signals: bool,
exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"), exception_on_timeout: Exception,
): ):
"""Convenient wrapper around wait_for_signals to set status based on the result. """Convenient wrapper around wait_for_signals to set status based on the result.
@ -231,8 +240,16 @@ class CustomDetectorMixin:
if result: if result:
status.set_finished() status.set_finished()
else: else:
status.set_exception(exception_on_timeout) if self.stopped:
status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped"))
else:
status.set_exception(exc=exception_on_timeout)
# pylint: disable=broad-except
except Exception as exc: except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}"
)
status.set_exception(exc=exc) status.set_exception(exc=exc)
thread = threading.Thread( thread = threading.Thread(

View File

@ -13,6 +13,7 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
from ophyd_devices.sim.sim_data import SimulatedDataCamera from ophyd_devices.sim.sim_data import SimulatedDataCamera
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.sim.sim_utils import H5Writer from ophyd_devices.sim.sim_utils import H5Writer
from ophyd_devices.utils.errors import DeviceStopError
logger = bec_logger.logger logger = bec_logger.logger
@ -36,19 +37,19 @@ class SimCameraSetup(CustomDetectorMixin):
status = DeviceStatus(self.parent) status = DeviceStatus(self.parent)
def on_trigger_call(status: DeviceStatus) -> None: def on_trigger_call(status: DeviceStatus) -> None:
success = True error = None
try: try:
for _ in range(self.parent.burst.get()): for _ in range(self.parent.burst.get()):
data = self.parent.image.get() data = self.parent.image.get()
# pylint: disable=protected-access # pylint: disable=protected-access
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data) self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data)
if self.parent.stopped: if self.parent.stopped:
success = False error = DeviceStopError(f"{self.parent.name} was stopped")
break break
if self.parent.write_to_disk.get(): if self.parent.write_to_disk.get():
self.parent.h5_writer.receive_data(data) self.parent.h5_writer.receive_data(data)
# pylint: disable=protected-access # pylint: disable=expression-not-assigned
status._finished(success=success) status.set_finished() if not error else status.set_exception(exc=error)
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as exc: except Exception as exc:
content = traceback.format_exc() content = traceback.format_exc()
@ -92,16 +93,15 @@ class SimCameraSetup(CustomDetectorMixin):
status = DeviceStatus(self.parent) status = DeviceStatus(self.parent)
def on_complete_call(status: DeviceStatus) -> None: def on_complete_call(status: DeviceStatus) -> None:
success = True error = None
try: try:
if self.parent.write_to_disk.get(): if self.parent.write_to_disk.get():
self.parent.h5_writer.write_data() self.parent.h5_writer.write_data()
self.publish_file_location(done=True, successful=True) self.publish_file_location(done=True, successful=True)
# pylint: disable=protected-access
if self.parent.stopped: if self.parent.stopped:
success = False error = DeviceStopError(f"{self.parent.name} was stopped")
# pylint: disable=protected-access # pylint: disable=expression-not-assigned
status._finished(success=success) status.set_finished() if not error else status.set_exception(exc=error)
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as exc: except Exception as exc:
content = traceback.format_exc() content = traceback.format_exc()

View File

@ -1,3 +1,6 @@
"""Module for simulated monitor devices."""
import traceback
from threading import Thread from threading import Thread
import numpy as np import numpy as np
@ -13,6 +16,7 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
) )
from ophyd_devices.sim.sim_data import SimulatedDataMonitor from ophyd_devices.sim.sim_data import SimulatedDataMonitor
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.utils.errors import DeviceStopError
logger = bec_logger.logger logger = bec_logger.logger
@ -110,18 +114,19 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin):
status = DeviceStatus(self.parent) status = DeviceStatus(self.parent)
def on_complete_call(status: DeviceStatus) -> None: def on_complete_call(status: DeviceStatus) -> None:
exception = None error = None
try: try:
if self.parent.data_buffer["value"]: if self.parent.data_buffer["value"]:
self._send_data_to_bec() self._send_data_to_bec()
if self.parent.stopped:
error = DeviceStopError(f"{self.parent.name} was stopped")
# pylint: disable=expression-not-assigned
status.set_finished() if not error else status.set_exception(exc=error)
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as exc: except Exception as exc:
exception = exc content = traceback.format_exc()
finally: status.set_exception(exc=exc)
if exception: logger.warning(f"Error in {self.parent.name} on_complete; Traceback: {content}")
status.set_exception(exception)
else:
status.set_finished()
self._thread_complete = Thread(target=on_complete_call, args=(status,)) self._thread_complete = Thread(target=on_complete_call, args=(status,))
self._thread_complete.start() self._thread_complete.start()
@ -152,7 +157,7 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin):
status = DeviceStatus(self.parent) status = DeviceStatus(self.parent)
def on_trigger_call(status: DeviceStatus) -> None: def on_trigger_call(status: DeviceStatus) -> None:
exception = None error = None
try: try:
self.parent.data_buffer["value"].append(self.parent.readback.get()) self.parent.data_buffer["value"].append(self.parent.readback.get())
self.parent.data_buffer["timestamp"].append(self.parent.readback.timestamp) self.parent.data_buffer["timestamp"].append(self.parent.readback.timestamp)
@ -160,14 +165,17 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin):
self.parent.current_trigger.set(self._counter).wait() self.parent.current_trigger.set(self._counter).wait()
if self._counter % self._random_send_interval == 0: if self._counter % self._random_send_interval == 0:
self._send_data_to_bec() self._send_data_to_bec()
if self.parent.stopped:
error = DeviceStopError(f"{self.parent.name} was stopped")
# pylint: disable=expression-not-assigned
status.set_finished() if not error else status.set_exception(exc=error)
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as exc: except Exception as exc:
exception = exc content = traceback.format_exc()
finally: logger.warning(
if exception: f"Error in on_trigger_call in device {self.parent.name}; Traceback: {content}"
status.set_exception(exception) )
else: status.set_exception(exc=exc)
status.set_finished()
self._thread_trigger = Thread(target=on_trigger_call, args=(status,)) self._thread_trigger = Thread(target=on_trigger_call, args=(status,))
self._thread_trigger.start() self._thread_trigger.start()
@ -199,7 +207,7 @@ class SimMonitorAsync(PSIDetectorBase):
A simulated device to mimic the behaviour of an asynchronous monitor. A simulated device to mimic the behaviour of an asynchronous monitor.
During a scan, this device will send data not in sync with the point ID to BEC, During a scan, this device will send data not in sync with the point ID to BEC,
but buffer data and send it in random intervals. but buffer data and send it in random intervals.s
""" """
USER_ACCESS = ["sim", "registered_proxies", "async_update"] USER_ACCESS = ["sim", "registered_proxies", "async_update"]

View File

@ -1,5 +1,8 @@
""" Module for simulated positioner devices. """
import threading import threading
import time as ttime import time as ttime
import traceback
import numpy as np import numpy as np
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
@ -12,6 +15,7 @@ from ophyd_devices.sim.sim_data import SimulatedPositioner
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.sim.sim_test_devices import DummyController from ophyd_devices.sim.sim_test_devices import DummyController
from ophyd_devices.sim.sim_utils import LinearTrajectory, stop_trajectory from ophyd_devices.sim.sim_utils import LinearTrajectory, stop_trajectory
from ophyd_devices.utils.errors import DeviceStopError
logger = bec_logger.logger logger = bec_logger.logger
@ -155,7 +159,7 @@ class SimPositioner(Device, PositionerBase):
def _move_and_finish(self, start_pos, stop_pos, st): def _move_and_finish(self, start_pos, stop_pos, st):
"""Move the simulated device and finish the motion.""" """Move the simulated device and finish the motion."""
success = True error = None
target = stop_pos + self.tolerance.get() * np.random.uniform(-1, 1) target = stop_pos + self.tolerance.get() * np.random.uniform(-1, 1)
@ -166,14 +170,21 @@ class SimPositioner(Device, PositionerBase):
ttime.sleep(1 / self.update_frequency) ttime.sleep(1 / self.update_frequency)
self._update_state(ii) self._update_state(ii)
if self._stopped: if self._stopped:
success = False error = DeviceStopError(f"{self.name} was stopped")
break break
else: else:
self._update_state(target) self._update_state(target)
# pylint: disable=expression-not-assigned
st.set_finished() if error is None else st.set_exception(error)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in on_complete call in device {self.name}. Error traceback: {content}"
)
st.set_exception(exc=exc)
finally: finally:
self._done_moving(success=success)
self._set_sim_state(self.motor_is_moving.name, 0) self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
def move(self, value: float, **kwargs) -> DeviceStatus: def move(self, value: float, **kwargs) -> DeviceStatus:
"""Change the setpoint of the simulated device, and simultaneously initiate a motion.""" """Change the setpoint of the simulated device, and simultaneously initiate a motion."""
@ -201,6 +212,7 @@ class SimPositioner(Device, PositionerBase):
self._stopped = True self._stopped = True
if self.move_thread: if self.move_thread:
self.move_thread.join() self.move_thread.join()
self.move_thread = None
super().stop(success=success) super().stop(success=success)
@property @property
@ -219,7 +231,7 @@ class SimLinearTrajectoryPositioner(SimPositioner):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def _move_and_finish(self, start_pos, end_pos, st): def _move_and_finish(self, start_pos, end_pos, st):
success = True error = None
acc_time = ( acc_time = (
self.acceleration.get() self.acceleration.get()
) # acceleration in Ophyd refers to acceleration time in seconds ) # acceleration in Ophyd refers to acceleration time in seconds
@ -232,7 +244,7 @@ class SimLinearTrajectoryPositioner(SimPositioner):
ttime.sleep(1 / self.update_frequency) ttime.sleep(1 / self.update_frequency)
self._update_state(traj.position()) self._update_state(traj.position())
if self._stopped: if self._stopped:
success = False error = DeviceStopError(f"{self.name} was stopped")
break break
if self._stopped: if self._stopped:
# simulate deceleration # simulate deceleration
@ -241,10 +253,17 @@ class SimLinearTrajectoryPositioner(SimPositioner):
ttime.sleep(1 / self.update_frequency) ttime.sleep(1 / self.update_frequency)
self._update_state(traj.position()) self._update_state(traj.position())
self._update_state(traj.position()) self._update_state(traj.position())
# pylint: disable=expression-not-assigned
st.set_finished() if error is None else st.set_exception(error)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in on_complete call in device {self.name}. Error traceback: {content}"
)
st.set_exception(exc=exc)
finally: finally:
self._set_sim_state(self.motor_is_moving.name, 0) self._set_sim_state(self.motor_is_moving.name, 0)
self._done_moving(success=success)
st.set_finished()
class SimPositionerWithCommFailure(SimPositioner): class SimPositionerWithCommFailure(SimPositioner):

View File

@ -1,5 +1,8 @@
"""Module for a simulated 1D Waveform detector, i.e. a Falcon XRF detector."""
import os import os
import threading import threading
import traceback
import numpy as np import numpy as np
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
@ -9,6 +12,7 @@ from ophyd import Device, DeviceStatus, Kind
from ophyd_devices.sim.sim_data import SimulatedDataWaveform from ophyd_devices.sim.sim_data import SimulatedDataWaveform
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError
logger = bec_logger.logger logger = bec_logger.logger
@ -65,9 +69,10 @@ class SimWaveform(Device):
self.sim = self.sim_cls(parent=self, **kwargs) self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False self.stopped = False
self._staged = False self._staged = False
self.scaninfo = None self.scaninfo = None
self._trigger_thread = None
self._update_scaninfo() self._update_scaninfo()
if self.sim_init: if self.sim_init:
self.sim.set_init(self.sim_init) self.sim.set_init(self.sim_init)
@ -87,19 +92,24 @@ class SimWaveform(Device):
""" """
status = DeviceStatus(self) status = DeviceStatus(self)
self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False) def acquire(status: DeviceStatus):
error = None
def acquire():
try: try:
for _ in range(self.burst.get()): for _ in range(self.burst.get()):
self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get()) self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get())
if self._stopped: if self.stopped:
error = DeviceStopError(f"{self.name} was stopped")
break break
finally: # pylint: disable=expression-not-assigned
self._stopped = False status.set_finished() if not error else status.set_exception(exc=error)
self._done_acquiring() # pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
status.set_exception(exc=exc)
logger.warning(f"Error in {self.name} trigger; Traceback: {content}")
threading.Thread(target=acquire, daemon=True).start() self._trigger_thread = threading.Thread(target=acquire, args=(status,), daemon=True)
self._trigger_thread.start()
return status return status
def _update_scaninfo(self) -> None: def _update_scaninfo(self) -> None:
@ -129,7 +139,7 @@ class SimWaveform(Device):
self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
self.exp_time.set(self.scaninfo.exp_time) self.exp_time.set(self.scaninfo.exp_time)
self.burst.set(self.scaninfo.frames_per_trigger) self.burst.set(self.scaninfo.frames_per_trigger)
self._stopped = False self.stopped = False
return super().stage() return super().stage()
def unstage(self) -> list[object]: def unstage(self) -> list[object]:
@ -137,12 +147,15 @@ class SimWaveform(Device):
Send reads from all config signals to redis Send reads from all config signals to redis
""" """
if self._stopped is True or not self._staged: if self.stopped is True or not self._staged:
return super().unstage() return super().unstage()
return super().unstage() return super().unstage()
def stop(self, *, success=False): def stop(self, *, success=False):
"""Stop the device""" """Stop the device"""
self._stopped = True self.stopped = True
if self._trigger_thread:
self._trigger_thread.join()
self._trigger_thread = None
super().stop(success=success) super().stop(success=success)

View File

@ -0,0 +1,9 @@
"""Module for ophyd_devices specific errors. """
class DeviceStopError(Exception):
"""Error to raise if the device is stopped."""
class DeviceTimeoutError(Exception):
"""Error to raise if the device times out."""