refactor(trigger): Review trigger class integration
This commit is contained in:
@@ -9,8 +9,11 @@ logger = bec_logger.logger
|
||||
import enum
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from ophyd_devices import CompareStatus, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from superxas_bec.devices.falcon import Falcon, FalconAcquiringStatus
|
||||
|
||||
|
||||
class ContinuousSamplingMode(int, enum.Enum):
|
||||
"""Options for start_csmpl signal"""
|
||||
@@ -36,14 +39,22 @@ class TriggerControl(Device):
|
||||
doc="Number of cycles (multiplies by 0.2s)",
|
||||
)
|
||||
start_csmpl = Cpt(
|
||||
EpicsSignal, suffix="START-CSMPL", kind=Kind.config, doc="Continous sampling mode on/off"
|
||||
EpicsSignal,
|
||||
suffix="START-CSMPL",
|
||||
kind=Kind.config,
|
||||
doc="Continous sampling mode on/off",
|
||||
put_complete=True,
|
||||
)
|
||||
smpl = Cpt(
|
||||
EpicsSignal, suffix="SMPL", kind=Kind.config, doc="Sampling Trigger if cont mode is off"
|
||||
)
|
||||
smpl_done = Cpt(
|
||||
EpicsSignalRO, suffix="SMPL-DONE", kind=Kind.config, doc="Done status of trigger"
|
||||
)
|
||||
EpicsSignalRO,
|
||||
suffix="SMPL-DONE",
|
||||
kind=Kind.config,
|
||||
auto_monitor=True,
|
||||
doc="Done status of trigger",
|
||||
) # Status PV updates need to be monitored as we wait and react for changes
|
||||
|
||||
|
||||
class Trigger(PSIDeviceBase, TriggerControl):
|
||||
@@ -60,6 +71,7 @@ class Trigger(PSIDeviceBase, TriggerControl):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.device_manager = device_manager
|
||||
self._pv_timeout = 1
|
||||
self._falcon_ready_timeout = 5 # seconds
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
@@ -87,41 +99,40 @@ class Trigger(PSIDeviceBase, TriggerControl):
|
||||
"""
|
||||
self.start_csmpl.set(ContinuousSamplingMode.OFF).wait(timeout=self._pv_timeout)
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
if self.scan_info.msg.scan_name != "exafs_scan":
|
||||
self.set_exposure_time(exp_time).wait()
|
||||
if self.scan_info.msg.scan_name != "exafs_scan": # TODO
|
||||
self.set_exposure_time(exp_time).wait(self._pv_timeout)
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
status = self.start_csmpl.set(ContinuousSamplingMode.ON)
|
||||
status = self.start_csmpl.set(ContinuousSamplingMode.ON).wait(timeout=self._pv_timeout)
|
||||
return status
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
"""
|
||||
Called when the device is triggered.
|
||||
If Falcon is present in the device manager, it will also wait for the Falcon to be ready for acquiring,
|
||||
and then wait for the Falcon to be done with acquiring. This is to ensure that data of PVs is updated
|
||||
and the sampling is done before data is being read from the device.
|
||||
"""
|
||||
|
||||
falcon = self.device_manager.devices.get("falcon", None)
|
||||
|
||||
if falcon is not None:
|
||||
falcon: Falcon
|
||||
# pylint: disable=protected-access
|
||||
status = falcon._stop_erase_and_wait_for_acquiring()
|
||||
status.wait()
|
||||
|
||||
started = False
|
||||
|
||||
def _sampling_done():
|
||||
nonlocal started
|
||||
if not started and self.smpl_done.get() == SamplingDone.RUNNING:
|
||||
started = True
|
||||
return False
|
||||
if started and self.smpl_done.get() == SamplingDone.DONE:
|
||||
return True
|
||||
|
||||
return self.smpl_done.get() == SamplingDone.DONE
|
||||
CompareStatus(falcon.acquiring, FalconAcquiringStatus.ACQUIRING).wait(
|
||||
timeout=self._falcon_ready_timeout
|
||||
)
|
||||
|
||||
status_smpl = TransitionStatus(self.smpl_done, [SamplingDone.RUNNING, SamplingDone.DONE])
|
||||
self.smpl.put(1)
|
||||
status = self.task_handler.submit_task(_sampling_done, run=True)
|
||||
return status
|
||||
# We also need to wait for sampling to be done for the falcon if it's present.
|
||||
# if falcon is not None:
|
||||
# status_done = falcon.done_with_acquiring()
|
||||
# status_done.wait(timeout=self._falcon_ready_timeout)
|
||||
return status_smpl
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
Reference in New Issue
Block a user