feat: add async monitor, add on_complete to psi_det_base and rm duplicated mocks, closes #67

This commit is contained in:
2024-05-29 18:42:34 +02:00
parent f9b126c60c
commit 1aece61a3b
3 changed files with 174 additions and 89 deletions

View File

@@ -8,10 +8,9 @@ import os
import time
from bec_lib import messages
from bec_lib.device import DeviceStatus
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter
from ophyd import Device
from ophyd import Device, DeviceStatus
from ophyd.device import Staged
from ophyd_devices.utils import bec_utils
@@ -88,6 +87,13 @@ class CustomDetectorMixin:
Only use if needed, and it is recommended to keep this function as short/fast as possible.
"""
def on_complete(self) -> None:
"""
Specify actions to be executed when the scan is complete.
This can for instance be to check with the detector and backend if all data is written succsessfully.
"""
# TODO add configurable file_path instead of hardcoding self.parent.filepath
def publish_file_location(
self, done: bool = False, successful: bool = None, metadata: dict = {}
@@ -274,6 +280,19 @@ class PSIDetectorBase(Device):
self.custom_prepare.on_trigger()
return super().trigger()
def complete(self) -> None:
"""Complete the acquisition, called from BEC.
This function is called after the scan is complete, just before unstage.
We can check here with the data backend and detector if the acquisition successfully finished.
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
"""
status = DeviceStatus(self)
self.custom_prepare.on_complete()
status.set_finished()
return status
def unstage(self) -> list[object]:
"""
Unstage device after a scan.