fix: Improve asyn_monitor and camera on_trigger and on_complete to return status

This commit is contained in:
2024-07-21 16:39:57 +02:00
parent 9eb67a0900
commit f3118765b0
3 changed files with 126 additions and 36 deletions

View File

@ -400,8 +400,6 @@ class PSIDetectorBase(Device):
list(object): list of objects that were unstaged list(object): list of objects that were unstaged
""" """
self.check_scan_id() self.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.on_unstage() self.custom_prepare.on_unstage()
self.stopped = False self.stopped = False
return super().unstage() return super().unstage()

View File

@ -1,8 +1,10 @@
import traceback
from threading import Thread
import numpy as np import numpy as np
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind from ophyd import DeviceStatus, Kind
from ophyd.status import StatusBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import ( from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin, CustomDetectorMixin,
@ -18,6 +20,11 @@ logger = bec_logger.logger
class SimCameraSetup(CustomDetectorMixin): class SimCameraSetup(CustomDetectorMixin):
"""Mixin class for the SimCamera device.""" """Mixin class for the SimCamera device."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._thread_trigger = None
self._thread_complete = None
def on_trigger(self) -> None: def on_trigger(self) -> None:
"""Trigger the camera to acquire images. """Trigger the camera to acquire images.
@ -26,16 +33,33 @@ class SimCameraSetup(CustomDetectorMixin):
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC. Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
""" """
try: status = DeviceStatus(self.parent)
for _ in range(self.parent.burst.get()):
data = self.parent.image.get() def on_trigger_call(status: DeviceStatus) -> None:
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data) success = True
if self.parent.stopped: try:
break for _ in range(self.parent.burst.get()):
if self.parent.write_to_disk.get(): data = self.parent.image.get()
self.parent.h5_writer.receive_data(data) # pylint: disable=protected-access
finally: self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data)
self.parent.stopped = False if self.parent.stopped:
success = False
break
if self.parent.write_to_disk.get():
self.parent.h5_writer.receive_data(data)
# pylint: disable=protected-access
status._finished(success=success)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in on_trigger_call in device {self.parent.name}. Error traceback: {content}"
)
status.set_exception(exc)
self._thread_trigger = Thread(target=on_trigger_call, args=(status,))
self._thread_trigger.start()
return status
def on_stage(self) -> None: def on_stage(self) -> None:
"""Stage the camera for upcoming scan """Stage the camera for upcoming scan
@ -63,13 +87,41 @@ class SimCameraSetup(CustomDetectorMixin):
self.publish_file_location(done=False, successful=False) self.publish_file_location(done=False, successful=False)
self.parent.stopped = False self.parent.stopped = False
def on_unstage(self) -> None: def on_complete(self) -> None:
"""Unstage the device """Complete the motion of the simulated device."""
status = DeviceStatus(self.parent)
Send reads from all config signals to redis def on_complete_call(status: DeviceStatus) -> None:
""" success = True
if self.parent.write_to_disk.get(): try:
self.publish_file_location(done=True, successful=True) if self.parent.write_to_disk.get():
self.parent.h5_writer.write_data()
self.publish_file_location(done=True, successful=True)
# pylint: disable=protected-access
if self.parent.stopped:
success = False
# pylint: disable=protected-access
status._finished(success=success)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in on_complete call in device {self.parent.name}. Error traceback: {content}"
)
status.set_exception(exc)
self._thread_complete = Thread(target=on_complete_call, args=(status,), daemon=True)
self._thread_complete.start()
return status
def on_stop(self) -> None:
"""Stop the camera acquisition."""
if self._thread_trigger:
self._thread_trigger.join()
if self._thread_complete:
self._thread_complete.join()
self._thread_trigger = None
self._thread_complete = None
class SimCamera(PSIDetectorBase): class SimCamera(PSIDetectorBase):
@ -133,11 +185,3 @@ class SimCamera(PSIDetectorBase):
def registered_proxies(self) -> None: def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies.""" """Dictionary of registered signal_names and proxies."""
return self._registered_proxies return self._registered_proxies
def complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
status = DeviceStatus(self)
if self.write_to_disk.get():
self.h5_writer.write_data()
status.set_finished()
return status

View File

@ -1,9 +1,11 @@
from threading import Thread
import numpy as np import numpy as np
from bec_lib import messages from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, Kind from ophyd import Device, DeviceStatus, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import ( from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin, CustomDetectorMixin,
@ -82,6 +84,8 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin):
self._stream_ttl = 1800 self._stream_ttl = 1800
self._random_send_interval = None self._random_send_interval = None
self._counter = 0 self._counter = 0
self._thread_trigger = None
self._thread_complete = None
self.prep_random_interval() self.prep_random_interval()
self.parent.current_trigger.subscribe(self._progress_update, run=False) self.parent.current_trigger.subscribe(self._progress_update, run=False)
@ -103,8 +107,25 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin):
def on_complete(self): def on_complete(self):
"""Prepare the device for completion.""" """Prepare the device for completion."""
if self.parent.data_buffer["value"]: status = DeviceStatus(self.parent)
self._send_data_to_bec()
def on_complete_call(status: DeviceStatus) -> None:
exception = None
try:
if self.parent.data_buffer["value"]:
self._send_data_to_bec()
# pylint: disable=broad-except
except Exception as exc:
exception = exc
finally:
if exception:
status.set_exception(exception)
else:
status.set_finished()
self._thread_complete = Thread(target=on_complete_call, args=(status,))
self._thread_complete.start()
return status
def _send_data_to_bec(self) -> None: def _send_data_to_bec(self) -> None:
"""Sends bundled data to BEC""" """Sends bundled data to BEC"""
@ -128,16 +149,34 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin):
def on_trigger(self): def on_trigger(self):
"""Prepare the device for triggering.""" """Prepare the device for triggering."""
self.parent.data_buffer["value"].append(self.parent.readback.get()) status = DeviceStatus(self.parent)
self.parent.data_buffer["timestamp"].append(self.parent.readback.timestamp)
self._counter += 1 def on_trigger_call(status: DeviceStatus) -> None:
self.parent.current_trigger.set(self._counter).wait() exception = None
if self._counter % self._random_send_interval == 0: try:
self._send_data_to_bec() self.parent.data_buffer["value"].append(self.parent.readback.get())
self.parent.data_buffer["timestamp"].append(self.parent.readback.timestamp)
self._counter += 1
self.parent.current_trigger.set(self._counter).wait()
if self._counter % self._random_send_interval == 0:
self._send_data_to_bec()
# pylint: disable=broad-except
except Exception as exc:
exception = exc
finally:
if exception:
status.set_exception(exception)
else:
status.set_finished()
self._thread_trigger = Thread(target=on_trigger_call, args=(status,))
self._thread_trigger.start()
return status
def _progress_update(self, value: int, **kwargs): def _progress_update(self, value: int, **kwargs):
"""Update the progress of the device.""" """Update the progress of the device."""
max_value = self.parent.scaninfo.num_points max_value = self.parent.scaninfo.num_points
# pylint: disable=protected-access
self.parent._run_subs( self.parent._run_subs(
sub_type=self.parent.SUB_PROGRESS, sub_type=self.parent.SUB_PROGRESS,
value=value, value=value,
@ -145,6 +184,15 @@ class SimMonitorAsyncPrepare(CustomDetectorMixin):
done=bool(max_value == value), done=bool(max_value == value),
) )
def on_stop(self):
"""Stop the device."""
if self._thread_trigger:
self._thread_trigger.join()
if self._thread_complete:
self._thread_complete.join()
self._thread_trigger = None
self._thread_complete = None
class SimMonitorAsync(PSIDetectorBase): class SimMonitorAsync(PSIDetectorBase):
""" """