diff --git a/ophyd_devices/interfaces/base_classes/psi_device_base.py b/ophyd_devices/interfaces/base_classes/psi_device_base.py index 2729723..de6ad3e 100644 --- a/ophyd_devices/interfaces/base_classes/psi_device_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_device_base.py @@ -1,10 +1,10 @@ """ -Consider using the bec_device_base name for the base class. -I will use this name instead here to simplify comparisons between the two approaches +Base class for all PSI ophyd device integration to ensure consistent configuration """ from __future__ import annotations +from bec_lib.devicemanager import ScanInfo from ophyd import Device, DeviceStatus, Staged, StatusBase from ophyd_devices.tests.utils import get_mock_scan_info @@ -29,7 +29,7 @@ class PSIDeviceBase(Device): SUB_DEVICE_MONITOR_2D = "device_monitor_2d" _default_sub = SUB_VALUE - def __init__(self, name: str, scan_info=None, **kwargs): + def __init__(self, name: str, scan_info: ScanInfo | None = None, **kwargs): # type: ignore """ Initialize the PSI Device Base class. @@ -73,37 +73,37 @@ class PSIDeviceBase(Device): # Wrapper around Device class methods # ######################################## - def stage(self) -> list[object] | StatusBase: + def stage(self) -> list[object] | DeviceStatus | StatusBase: # type: ignore """Stage the device.""" if self.staged != Staged.no: return super().stage() self.stopped = False super_staged = super().stage() status = self.on_stage() # pylint: disable=assignment-from-no-return - if isinstance(status, StatusBase): + if isinstance(status, DeviceStatus): return status return super_staged - def unstage(self) -> list[object] | StatusBase: + def unstage(self) -> list[object] | DeviceStatus | StatusBase: # type: ignore """Unstage the device.""" super_unstage = super().unstage() status = self.on_unstage() # pylint: disable=assignment-from-no-return - if isinstance(status, StatusBase): + if isinstance(status, DeviceStatus): return status return super_unstage - def pre_scan(self) -> StatusBase | None: + def pre_scan(self) -> DeviceStatus | StatusBase | None: """Pre-scan function.""" status = self.on_pre_scan() # pylint: disable=assignment-from-no-return return status - def trigger(self) -> DeviceStatus: + def trigger(self) -> DeviceStatus | StatusBase: """Trigger the device.""" super_trigger = super().trigger() status = self.on_trigger() # pylint: disable=assignment-from-no-return return status if status else super_trigger - def complete(self) -> DeviceStatus: + def complete(self) -> DeviceStatus | StatusBase: """Complete the device.""" status = self.on_complete() # pylint: disable=assignment-from-no-return if isinstance(status, StatusBase): @@ -112,7 +112,7 @@ class PSIDeviceBase(Device): status.set_finished() return status - def kickoff(self) -> DeviceStatus: + def kickoff(self) -> DeviceStatus | StatusBase: """Kickoff the device.""" status = self.on_kickoff() # pylint: disable=assignment-from-no-return if isinstance(status, StatusBase): @@ -121,12 +121,11 @@ class PSIDeviceBase(Device): status.set_finished() return status - # pylint: disable=arguments-differ - def stop(self, success: bool = False) -> None: + def stop(self, *, success: bool = False) -> None: """Stop the device. Args: - success (bool): True if the device was stopped successfully. + success (bool): True if the action was successful, False otherwise. """ self.on_stop() super().stop(success=success) @@ -140,8 +139,8 @@ class PSIDeviceBase(Device): """ Called when the device is initialized. - No siganls are connected at this point, - thus should not be set here but in on_connected instead. + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. """ def on_connected(self) -> None: @@ -150,26 +149,26 @@ class PSIDeviceBase(Device): Default values for signals should be set here. """ - def on_stage(self) -> DeviceStatus | None: + def on_stage(self) -> DeviceStatus | StatusBase | None: """ Called while staging the device. - Information about the upcoming scan can be accessed from the scan_info object. + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. """ - def on_unstage(self) -> DeviceStatus | None: + def on_unstage(self) -> DeviceStatus | StatusBase | None: """Called while unstaging the device.""" - def on_pre_scan(self) -> DeviceStatus | None: + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: """Called right before the scan starts on all devices automatically.""" - def on_trigger(self) -> DeviceStatus | None: + def on_trigger(self) -> DeviceStatus | StatusBase | None: """Called when the device is triggered.""" - def on_complete(self) -> DeviceStatus | None: + def on_complete(self) -> DeviceStatus | StatusBase | None: """Called to inquire if a device has completed a scans.""" - def on_kickoff(self) -> DeviceStatus | None: + def on_kickoff(self) -> DeviceStatus | StatusBase | None: """Called to kickoff a device for a fly scan. Has to be called explicitly.""" def on_stop(self) -> None: diff --git a/ophyd_devices/sim/sim_camera.py b/ophyd_devices/sim/sim_camera.py index 00d821d..48891e8 100644 --- a/ophyd_devices/sim/sim_camera.py +++ b/ophyd_devices/sim/sim_camera.py @@ -131,10 +131,11 @@ class SimCamera(PSIDeviceBase, SimCameraControl): def on_complete(self) -> StatusBase: """Complete the motion of the simulated device.""" + if not self.write_to_disk.get(): + return None + def complete_cam(): """Complete the camera acquisition.""" - if self.write_to_disk.get(): - self.h5_writer.on_complete() self._run_subs( sub_type=self.SUB_FILE_EVENT, file_path=self.file_path,