diff --git a/phoenix_bec/devices/phoenix_trigger.py b/phoenix_bec/devices/phoenix_trigger.py index 06783cd..d4fe1c7 100644 --- a/phoenix_bec/devices/phoenix_trigger.py +++ b/phoenix_bec/devices/phoenix_trigger.py @@ -1,18 +1,18 @@ -import time +""" Module for the PhoenixTrigger class to connect to the ADC card +that creates TTL signals to trigger cameras and detectors at Phoenix. """ import enum -import numpy as np +import time +import numpy as np +from bec_lib import bec_logger from ophyd import Component as Cpt from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind - from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - PSIDetectorBase, CustomDetectorMixin, + PSIDetectorBase, ) -from bec_lib import bec_logger - logger = bec_logger.logger DETECTOR_TIMEOUT = 5 @@ -22,7 +22,7 @@ class PhoenixTriggerError(Exception): """PhoenixTrigger specific error""" -class SAMPLINGDONE(int, enum.Enum): +class SAMPLING(int, enum.Enum): """Sampling Done PV""" RUNNING = 0 @@ -31,13 +31,11 @@ class SAMPLINGDONE(int, enum.Enum): class PhoenixTriggerSetup(CustomDetectorMixin): """ - This defines the PHOENIX trigger setup. - - + Mixin Class to setup the PhoenixTrigger device """ def on_stage(self) -> None: - """Actions to take place on stage""" + """On stage actions which are executed upon staging the device""" if self.parent.scaninfo.scan_type == "step": self.parent.start_csmpl.set(0) self.parent.total_cycles.set(1) @@ -47,19 +45,21 @@ class PhoenixTriggerSetup(CustomDetectorMixin): logger.info(f"Device {self.parent.name} was staged for step scan") def on_unstage(self) -> None: - """Actions to take place on unstage""" + """On unstage actions which are executed upon unstaging the device""" self.on_stop() def on_trigger(self) -> DeviceStatus: - """Actions to be performed upon receiving a software trigger""" + """On trigger actions which are executed upon triggering the device""" + # TODO Test the proper check for the falcon state # Check first that falcon is set to acquiring falcon = self.parent.device_manager.devices.get("falcon_nohdf5", None) + timeout = 1 if falcon is not None: - if self.wait_for_signals([(falcon.state.get, 1)], timeout=1): + # TODO Check that falcon.state.get() == 1 is the correct check. --> When is the falcon acquiring, this assumes 1? + if not self.wait_for_signals([(falcon.state.get, 1)], timeout=timeout): raise PhoenixTriggerError( - f"Falcon not ready to take trigger after 1s timeout in trigger" + f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s" ) - falcon.state.get() == 1 # Acquiring if self.parent.scaninfo.scan_type == "step": time.sleep(0.2) self.parent.smpl.put(1) @@ -67,7 +67,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin): time.sleep(0.2) # Trigger function from ophyd.Device returns a DeviceStatus. This function # starts a process that creates a DeviceStatus, and waits for the signal_conditions - # self.parent.smpl_done.get to change to the value SAMPLINGDONE.DONE + # self.parent.smpl_done.get to change to the value SAMPLING.DONE # Once this takes place, the DeviceStatus.done flag will be set to True. # When BEC calls trigger() on the devices, this method will be called assuming that # the devices config softwareTrigger=True is set. @@ -75,7 +75,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin): # self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) # which ensures that the DeviceStatus object resolves before continuing, i.e. DeviceStatus.done = True status = self.wait_with_status( - signal_conditions=[(self.parent.smpl_done.get, SAMPLINGDONE.DONE)], + signal_conditions=[(self.parent.smpl_done.get, SAMPLING.DONE)], timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate check_stopped=True, ) @@ -83,24 +83,23 @@ class PhoenixTriggerSetup(CustomDetectorMixin): def on_stop(self) -> None: """Actions to stop the Device""" - # Put the Device in cont mode + # Put the Device again in continous acquisition mode self.parent.total_cycles.set(5) self.parent.start_csmpl.set(1) self.parent.smpl.put(1) time.sleep(0.5) self.parent.smpl.put(1) time.sleep(0.2) - if self.parent.smpl_done.get() == SAMPLINGDONE.RUNNING: + if self.parent.smpl_done.get() == SAMPLING.RUNNING: return self.parent.smpl.put(1) class PhoenixTrigger(PSIDetectorBase): """ - Docstring: - - Class for PHOENIX TTL hardware trigger (X07MB-OP2:) + Class for PHOENIX TTL hardware trigger: 'X07MB-OP2:' + This device is used to trigger communicate with an ADC card that creates TTL signals to trigger cameras and detectors at Phoenix. """ custom_prepare_cls = PhoenixTriggerSetup @@ -123,15 +122,13 @@ class PhoenixTrigger(PSIDetectorBase): if __name__ == "__main__": + # Test the PhoenixTrigger class trigger = PhoenixTrigger(name="trigger", prefix="X07MB-OP2:") trigger.wait_for_connection(all_signals=True) trigger.read() trigger.read_configuration() trigger.stage() - status = trigger.trigger() - while status.done is False: - print(f" Waiting for status, flag is {status.done}") - time.sleep(0.2) - + device_status = trigger.trigger() + device_status.wait() trigger.unstage()