feat(psi device base): stoppable status objects

Add methods to PSIDeviceBase to register status object that should be cancelled when the device is stopped or destroyed.
This commit is contained in:
2025-06-15 13:17:02 +02:00
committed by Klaus Wakonig
parent 91552c871f
commit cb7f7ba932
2 changed files with 71 additions and 0 deletions

View File

@@ -64,6 +64,7 @@ class PSIDeviceBase(Device):
else:
super().__init__(prefix=prefix, name=name, **kwargs)
self._stopped = False
self._stoppable_status_objects: list[StatusBase] = []
self.task_handler = TaskHandler(parent=self)
self.file_utils = FileHandler()
if scan_info is None:
@@ -113,6 +114,7 @@ class PSIDeviceBase(Device):
"""Unstage the device."""
super_unstage = super().unstage()
status = self.on_unstage() # pylint: disable=assignment-from-no-return
self._stop_stoppable_status_objects()
if isinstance(status, StatusBase):
return status
return super_unstage
@@ -154,13 +156,52 @@ class PSIDeviceBase(Device):
"""
self.on_stop()
self.stopped = True # Set stopped flag to True, in case a custom stop method listens to stopped property
# Stop all stoppable status objects
self._stop_stoppable_status_objects()
super().stop(success=success)
def destroy(self):
"""Destroy the device."""
self.on_destroy() # Call the on_destroy method
self._stop_stoppable_status_objects()
return super().destroy()
########################################
# Stoppable Status Objects Management #
########################################
def cancel_on_stop(self, status: StatusBase) -> None:
"""
Register a status object to be cancelled when the device is stopped.
Args:
status (StatusBase): The status object to be cancelled.
"""
if not isinstance(status, StatusBase):
raise TypeError("status must be an instance of StatusBase")
self._stoppable_status_objects.append(status)
def _clear_stoppable_status_objects(self) -> None:
"""
Clear all registered stoppable status objects.
This is useful to reset the list of status objects that should be cancelled
when the device is stopped.
"""
self._stoppable_status_objects = []
def _stop_stoppable_status_objects(self) -> None:
"""
Stop all registered stoppable status objects.
This method will cancel all status objects that have been registered
to be stopped when the device is stopped.
"""
for status in self._stoppable_status_objects:
if not status.done:
status.set_exception(DeviceStoppedError(f"Device {self.name} has been stopped"))
self._clear_stoppable_status_objects()
########################################
# Utility Method to wait for signals #
########################################