diff --git a/superxas_bec/devices/trigger.py b/superxas_bec/devices/trigger.py new file mode 100644 index 0000000..0ed09d2 --- /dev/null +++ b/superxas_bec/devices/trigger.py @@ -0,0 +1,122 @@ +from ophyd import Device, Kind, Component as Cpt +from ophyd import EpicsSignal, EpicsSignalRO, DeviceStatus, StatusBase +from ophyd.status import SubscriptionStatus +from bec_lib.logger import bec_logger + +logger = bec_logger.logger + +from bec_lib.devicemanager import ScanInfo + +import enum + +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +class ContinuousSamplingMode(int, enum.Enum): + """ Options for start_csmpl signal""" + OFF = 0 + ON = 1 + +class SamplingDone(int, enum.Enum): + """ Status of sampling """ + RUNNING = 0 + DONE = 1 + +class TriggerControl(Device): + """ Trigger Device Control PVs at X10DA, prefix: X10DA-ES1: """ + + total_cycles = Cpt(EpicsSignal, suffix='TOTAL-CYCLES', kind=Kind.config, doc="Number of cycles (multiplies by 0.2s)") + start_csmpl = Cpt(EpicsSignal, suffix='START-CSMPL', kind=Kind.config, doc="Continous sampling mode on/off") + smpl = Cpt(EpicsSignal, suffix='SMPL', kind=Kind.config, doc="Sampling Trigger if cont mode is off") + smpl_done = Cpt(EpicsSignalRO, suffix='SMPL-DONE', kind=Kind.config, doc="Done status of trigger") + + + +class Trigger(PSIDeviceBase, TriggerControl): + """ Trigger Device of X10DA (SUPERXAS), prefix: X10DA-ES1: """ + + def __init__(self, name: str, prefix:str='',scan_info: ScanInfo | None = None, device_manager=None, **kwargs): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self.device_manager = device_manager + self._pv_timeout = 1 + + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + self.start_csmpl.set(ContinuousSamplingMode.OFF).wait() + exp_time = self.scan_info.msg.scan_parameters['exp_time'] + if self.scan_info.msg.scan_name != "exafs_scan": + self.set_exposure_time(exp_time).wait() + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device.""" + status = self.start_csmpl.set(ContinuousSamplingMode.ON) + return status + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + falcon = self.device_manager.devices.get("falcon", None) + + if falcon is not None: + status = falcon._stop_erase_and_wait_for_acquiring() + status.wait() + + started = False + + def _sampling_done(): + nonlocal started + if not started and self.smpl_done.get() == SamplingDone.RUNNING: + started = True + return False + if started and self.smpl_done.get() == SamplingDone.DONE: + return True + + return self.smpl_done.get() == SamplingDone.DONE + + self.smpl.put(1) + status = self.task_handler.submit_task(_sampling_done,run=True) + return status + + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """Called to inquire if a device has completed a scans.""" + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.task_handler.shutdown() + + def set_exposure_time(self, value:float) -> DeviceStatus: + """ Utility method to set exposure time complying to device logic with cycle of min 0.2s.""" + cycles = max(int(value*5),1) + return self.total_cycles.set(cycles) + + +