refactor: cleanup phoenix_trigger device

This commit is contained in:
2024-08-28 16:01:56 +02:00
parent bfa06fe519
commit b752a79c41

View File

@ -1,18 +1,18 @@
import time """ Module for the PhoenixTrigger class to connect to the ADC card
that creates TTL signals to trigger cameras and detectors at Phoenix. """
import enum import enum
import numpy as np import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import ( from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
CustomDetectorMixin, CustomDetectorMixin,
PSIDetectorBase,
) )
from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
DETECTOR_TIMEOUT = 5 DETECTOR_TIMEOUT = 5
@ -22,7 +22,7 @@ class PhoenixTriggerError(Exception):
"""PhoenixTrigger specific error""" """PhoenixTrigger specific error"""
class SAMPLINGDONE(int, enum.Enum): class SAMPLING(int, enum.Enum):
"""Sampling Done PV""" """Sampling Done PV"""
RUNNING = 0 RUNNING = 0
@ -31,13 +31,11 @@ class SAMPLINGDONE(int, enum.Enum):
class PhoenixTriggerSetup(CustomDetectorMixin): class PhoenixTriggerSetup(CustomDetectorMixin):
""" """
This defines the PHOENIX trigger setup. Mixin Class to setup the PhoenixTrigger device
""" """
def on_stage(self) -> None: def on_stage(self) -> None:
"""Actions to take place on stage""" """On stage actions which are executed upon staging the device"""
if self.parent.scaninfo.scan_type == "step": if self.parent.scaninfo.scan_type == "step":
self.parent.start_csmpl.set(0) self.parent.start_csmpl.set(0)
self.parent.total_cycles.set(1) self.parent.total_cycles.set(1)
@ -47,19 +45,21 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
logger.info(f"Device {self.parent.name} was staged for step scan") logger.info(f"Device {self.parent.name} was staged for step scan")
def on_unstage(self) -> None: def on_unstage(self) -> None:
"""Actions to take place on unstage""" """On unstage actions which are executed upon unstaging the device"""
self.on_stop() self.on_stop()
def on_trigger(self) -> DeviceStatus: def on_trigger(self) -> DeviceStatus:
"""Actions to be performed upon receiving a software trigger""" """On trigger actions which are executed upon triggering the device"""
# TODO Test the proper check for the falcon state
# Check first that falcon is set to acquiring # Check first that falcon is set to acquiring
falcon = self.parent.device_manager.devices.get("falcon_nohdf5", None) falcon = self.parent.device_manager.devices.get("falcon_nohdf5", None)
timeout = 1
if falcon is not None: if falcon is not None:
if self.wait_for_signals([(falcon.state.get, 1)], timeout=1): # TODO Check that falcon.state.get() == 1 is the correct check. --> When is the falcon acquiring, this assumes 1?
if not self.wait_for_signals([(falcon.state.get, 1)], timeout=timeout):
raise PhoenixTriggerError( raise PhoenixTriggerError(
f"Falcon not ready to take trigger after 1s timeout in trigger" f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s"
) )
falcon.state.get() == 1 # Acquiring
if self.parent.scaninfo.scan_type == "step": if self.parent.scaninfo.scan_type == "step":
time.sleep(0.2) time.sleep(0.2)
self.parent.smpl.put(1) self.parent.smpl.put(1)
@ -67,7 +67,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
time.sleep(0.2) time.sleep(0.2)
# Trigger function from ophyd.Device returns a DeviceStatus. This function # Trigger function from ophyd.Device returns a DeviceStatus. This function
# starts a process that creates a DeviceStatus, and waits for the signal_conditions # starts a process that creates a DeviceStatus, and waits for the signal_conditions
# self.parent.smpl_done.get to change to the value SAMPLINGDONE.DONE # self.parent.smpl_done.get to change to the value SAMPLING.DONE
# Once this takes place, the DeviceStatus.done flag will be set to True. # Once this takes place, the DeviceStatus.done flag will be set to True.
# When BEC calls trigger() on the devices, this method will be called assuming that # When BEC calls trigger() on the devices, this method will be called assuming that
# the devices config softwareTrigger=True is set. # the devices config softwareTrigger=True is set.
@ -75,7 +75,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
# self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) # self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
# which ensures that the DeviceStatus object resolves before continuing, i.e. DeviceStatus.done = True # which ensures that the DeviceStatus object resolves before continuing, i.e. DeviceStatus.done = True
status = self.wait_with_status( status = self.wait_with_status(
signal_conditions=[(self.parent.smpl_done.get, SAMPLINGDONE.DONE)], signal_conditions=[(self.parent.smpl_done.get, SAMPLING.DONE)],
timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate
check_stopped=True, check_stopped=True,
) )
@ -83,24 +83,23 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
def on_stop(self) -> None: def on_stop(self) -> None:
"""Actions to stop the Device""" """Actions to stop the Device"""
# Put the Device in cont mode # Put the Device again in continous acquisition mode
self.parent.total_cycles.set(5) self.parent.total_cycles.set(5)
self.parent.start_csmpl.set(1) self.parent.start_csmpl.set(1)
self.parent.smpl.put(1) self.parent.smpl.put(1)
time.sleep(0.5) time.sleep(0.5)
self.parent.smpl.put(1) self.parent.smpl.put(1)
time.sleep(0.2) time.sleep(0.2)
if self.parent.smpl_done.get() == SAMPLINGDONE.RUNNING: if self.parent.smpl_done.get() == SAMPLING.RUNNING:
return return
self.parent.smpl.put(1) self.parent.smpl.put(1)
class PhoenixTrigger(PSIDetectorBase): class PhoenixTrigger(PSIDetectorBase):
""" """
Docstring: Class for PHOENIX TTL hardware trigger: 'X07MB-OP2:'
Class for PHOENIX TTL hardware trigger (X07MB-OP2:)
This device is used to trigger communicate with an ADC card that creates TTL signals to trigger cameras and detectors at Phoenix.
""" """
custom_prepare_cls = PhoenixTriggerSetup custom_prepare_cls = PhoenixTriggerSetup
@ -123,15 +122,13 @@ class PhoenixTrigger(PSIDetectorBase):
if __name__ == "__main__": if __name__ == "__main__":
# Test the PhoenixTrigger class
trigger = PhoenixTrigger(name="trigger", prefix="X07MB-OP2:") trigger = PhoenixTrigger(name="trigger", prefix="X07MB-OP2:")
trigger.wait_for_connection(all_signals=True) trigger.wait_for_connection(all_signals=True)
trigger.read() trigger.read()
trigger.read_configuration() trigger.read_configuration()
trigger.stage() trigger.stage()
status = trigger.trigger() device_status = trigger.trigger()
while status.done is False: device_status.wait()
print(f" Waiting for status, flag is {status.done}")
time.sleep(0.2)
trigger.unstage() trigger.unstage()