feat: add trigger card integration
This commit is contained in:
122
superxas_bec/devices/trigger.py
Normal file
122
superxas_bec/devices/trigger.py
Normal file
@@ -0,0 +1,122 @@
|
||||
from ophyd import Device, Kind, Component as Cpt
|
||||
from ophyd import EpicsSignal, EpicsSignalRO, DeviceStatus, StatusBase
|
||||
from ophyd.status import SubscriptionStatus
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
|
||||
import enum
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
class ContinuousSamplingMode(int, enum.Enum):
|
||||
""" Options for start_csmpl signal"""
|
||||
OFF = 0
|
||||
ON = 1
|
||||
|
||||
class SamplingDone(int, enum.Enum):
|
||||
""" Status of sampling """
|
||||
RUNNING = 0
|
||||
DONE = 1
|
||||
|
||||
class TriggerControl(Device):
|
||||
""" Trigger Device Control PVs at X10DA, prefix: X10DA-ES1: """
|
||||
|
||||
total_cycles = Cpt(EpicsSignal, suffix='TOTAL-CYCLES', kind=Kind.config, doc="Number of cycles (multiplies by 0.2s)")
|
||||
start_csmpl = Cpt(EpicsSignal, suffix='START-CSMPL', kind=Kind.config, doc="Continous sampling mode on/off")
|
||||
smpl = Cpt(EpicsSignal, suffix='SMPL', kind=Kind.config, doc="Sampling Trigger if cont mode is off")
|
||||
smpl_done = Cpt(EpicsSignalRO, suffix='SMPL-DONE', kind=Kind.config, doc="Done status of trigger")
|
||||
|
||||
|
||||
|
||||
class Trigger(PSIDeviceBase, TriggerControl):
|
||||
""" Trigger Device of X10DA (SUPERXAS), prefix: X10DA-ES1: """
|
||||
|
||||
def __init__(self, name: str, prefix:str='',scan_info: ScanInfo | None = None, device_manager=None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.device_manager = device_manager
|
||||
self._pv_timeout = 1
|
||||
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
self.start_csmpl.set(ContinuousSamplingMode.OFF).wait()
|
||||
exp_time = self.scan_info.msg.scan_parameters['exp_time']
|
||||
if self.scan_info.msg.scan_name != "exafs_scan":
|
||||
self.set_exposure_time(exp_time).wait()
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
status = self.start_csmpl.set(ContinuousSamplingMode.ON)
|
||||
return status
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
falcon = self.device_manager.devices.get("falcon", None)
|
||||
|
||||
if falcon is not None:
|
||||
status = falcon._stop_erase_and_wait_for_acquiring()
|
||||
status.wait()
|
||||
|
||||
started = False
|
||||
|
||||
def _sampling_done():
|
||||
nonlocal started
|
||||
if not started and self.smpl_done.get() == SamplingDone.RUNNING:
|
||||
started = True
|
||||
return False
|
||||
if started and self.smpl_done.get() == SamplingDone.DONE:
|
||||
return True
|
||||
|
||||
return self.smpl_done.get() == SamplingDone.DONE
|
||||
|
||||
self.smpl.put(1)
|
||||
status = self.task_handler.submit_task(_sampling_done,run=True)
|
||||
return status
|
||||
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.task_handler.shutdown()
|
||||
|
||||
def set_exposure_time(self, value:float) -> DeviceStatus:
|
||||
""" Utility method to set exposure time complying to device logic with cycle of min 0.2s."""
|
||||
cycles = max(int(value*5),1)
|
||||
return self.total_cycles.set(cycles)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user