fix: fix sim camera complete call, improve typhints for psi device base

This commit is contained in:
appel_c 2025-03-06 12:52:47 +01:00
parent 7baba53c35
commit 8cdcfe7a44
2 changed files with 25 additions and 25 deletions

View File

@ -1,10 +1,10 @@
"""
Consider using the bec_device_base name for the base class.
I will use this name instead here to simplify comparisons between the two approaches
Base class for all PSI ophyd device integration to ensure consistent configuration
"""
from __future__ import annotations
from bec_lib.devicemanager import ScanInfo
from ophyd import Device, DeviceStatus, Staged, StatusBase
from ophyd_devices.tests.utils import get_mock_scan_info
@ -29,7 +29,7 @@ class PSIDeviceBase(Device):
SUB_DEVICE_MONITOR_2D = "device_monitor_2d"
_default_sub = SUB_VALUE
def __init__(self, name: str, scan_info=None, **kwargs):
def __init__(self, name: str, scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
Initialize the PSI Device Base class.
@ -73,37 +73,37 @@ class PSIDeviceBase(Device):
# Wrapper around Device class methods #
########################################
def stage(self) -> list[object] | StatusBase:
def stage(self) -> list[object] | DeviceStatus | StatusBase: # type: ignore
"""Stage the device."""
if self.staged != Staged.no:
return super().stage()
self.stopped = False
super_staged = super().stage()
status = self.on_stage() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
if isinstance(status, DeviceStatus):
return status
return super_staged
def unstage(self) -> list[object] | StatusBase:
def unstage(self) -> list[object] | DeviceStatus | StatusBase: # type: ignore
"""Unstage the device."""
super_unstage = super().unstage()
status = self.on_unstage() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
if isinstance(status, DeviceStatus):
return status
return super_unstage
def pre_scan(self) -> StatusBase | None:
def pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Pre-scan function."""
status = self.on_pre_scan() # pylint: disable=assignment-from-no-return
return status
def trigger(self) -> DeviceStatus:
def trigger(self) -> DeviceStatus | StatusBase:
"""Trigger the device."""
super_trigger = super().trigger()
status = self.on_trigger() # pylint: disable=assignment-from-no-return
return status if status else super_trigger
def complete(self) -> DeviceStatus:
def complete(self) -> DeviceStatus | StatusBase:
"""Complete the device."""
status = self.on_complete() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
@ -112,7 +112,7 @@ class PSIDeviceBase(Device):
status.set_finished()
return status
def kickoff(self) -> DeviceStatus:
def kickoff(self) -> DeviceStatus | StatusBase:
"""Kickoff the device."""
status = self.on_kickoff() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
@ -121,12 +121,11 @@ class PSIDeviceBase(Device):
status.set_finished()
return status
# pylint: disable=arguments-differ
def stop(self, success: bool = False) -> None:
def stop(self, *, success: bool = False) -> None:
"""Stop the device.
Args:
success (bool): True if the device was stopped successfully.
success (bool): True if the action was successful, False otherwise.
"""
self.on_stop()
super().stop(success=success)
@ -140,8 +139,8 @@ class PSIDeviceBase(Device):
"""
Called when the device is initialized.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
@ -150,26 +149,26 @@ class PSIDeviceBase(Device):
Default values for signals should be set here.
"""
def on_stage(self) -> DeviceStatus | None:
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> DeviceStatus | None:
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | None:
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | None:
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | None:
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | None:
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:

View File

@ -131,10 +131,11 @@ class SimCamera(PSIDeviceBase, SimCameraControl):
def on_complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
if not self.write_to_disk.get():
return None
def complete_cam():
"""Complete the camera acquisition."""
if self.write_to_disk.get():
self.h5_writer.on_complete()
self._run_subs(
sub_type=self.SUB_FILE_EVENT,
file_path=self.file_path,