diff --git a/csaxs_bec/device_configs/ddg_test_config.yaml b/csaxs_bec/device_configs/ddg_test_config.yaml new file mode 100644 index 0000000..8e9ad23 --- /dev/null +++ b/csaxs_bec/device_configs/ddg_test_config.yaml @@ -0,0 +1,34 @@ +ddg: + description: 'CSAXS master delay generator' + deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.ddg_master.DDGMaster + deviceConfig: + prefix: "X12SA-CPCL-DDG1:" + enabled: true + readOnly: false + onFailure: raise + readoutPriority: baseline + softwareTrigger: true + +samx: + readoutPriority: baseline + deviceClass: ophyd_devices.SimPositioner + deviceConfig: + delay: 1 + limits: + - -50 + - 50 + tolerance: 0.01 + update_frequency: 400 + deviceTags: + - user motors + enabled: true + readOnly: false + +bpm4i: + readoutPriority: monitored + deviceClass: ophyd_devices.SimMonitor + deviceConfig: + deviceTags: + - beamline + enabled: true + readOnly: false \ No newline at end of file diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs.py deleted file mode 100644 index 1094857..0000000 --- a/csaxs_bec/devices/epics/delay_generator_csaxs.py +++ /dev/null @@ -1,274 +0,0 @@ -from bec_lib import bec_logger -from ophyd import Component, DeviceStatus, Kind -from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource -from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare -from ophyd_devices.sim.sim_signals import SetableSignal -from ophyd_devices.utils import bec_utils - -logger = bec_logger.logger - - -class DelayGeneratorcSAXSError(Exception): - """Exception raised for errors.""" - - -class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]): - """ - Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS - """ - - def on_wait_for_connection(self) -> None: - """Init default parameter after the all signals are connected""" - for ii, channel in enumerate(self.parent.all_channels): - self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel]) - - self.parent.set_channels("amplitude", self.parent.amplitude.get()) - self.parent.set_channels("offset", self.parent.offset.get()) - # Setup reference - self.parent.set_channels( - "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs] - ) - self.parent.set_channels( - "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs] - ) - self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) - # Set threshold level for ext. pulses - self.parent.level.put(self.parent.thres_trig_level.get()) - - def on_stage(self) -> None: - "Hook execute before the scan starts" - if self.parent.scaninfo.scan_type == "step": - exp_time = self.parent.scaninfo.exp_time - delay = 0 - self.parent.burst_disable() - self.parent.set_trigger(TriggerSource.SINGLE_SHOT) - self.parent.set_channels(signal="width", value=exp_time) - self.parent.set_channels(signal="delay", value=delay) - return - scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "jjf_test": - # TODO implement the logic for JJF triggering - exp_time = 480e-6 # self.parent.scaninfo.exp_time - readout = 20e-6 # self.parent.scaninfo.readout_time - total_exposure = exp_time + readout - num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) - delay = 0 - delay_burst = self.parent.delay_burst.get() - - self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) - - self.parent.set_channels(signal="width", value=exp_time) - self.parent.set_channels(signal="delay", value=delay) - self.parent.burst_enable( - count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first" - ) - logger.info( - f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}" - ) - - def on_trigger(self) -> DeviceStatus: - """Method to be executed upon trigger""" - if self.parent.scaninfo.scan_type == "step": - self.parent.trigger_shot.put(1) - return - scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "jjf_test": - exp_time = 480e-6 # self.parent.scaninfo.exp_time - readout = 20e-6 # self.parent.scaninfo.readout_time - total_exposure = exp_time + readout - num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) - - # Start trigger cycle - self.parent.trigger_burst_readout.put(1) - - # Create status object that will wait for the end of the burst cycle - status = self.wait_with_status( - signal_conditions=[(self.parent.burst_cycle_finished, 1)], - timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure - check_stopped=True, - exception_on_timeout=DelayGeneratorcSAXSError( - f"{self.parent.name} run into timeout in complete call." - ), - ) - logger.info(f"Return status {self.parent.name}") - return status - - def on_complete(self) -> DeviceStatus: - pass - - def on_pre_scan(self) -> None: - """ - Method called by pre_scan hook in parent class. - - Executes trigger if premove_trigger is Trus. - """ - if self.parent.premove_trigger.get() is True: - self.parent.trigger_shot.put(1) - - -class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): - """ - DG645 delay generator at cSAXS (multiple can be in use depending on the setup) - - Default values for setting up DDG. - Note: checks of set calues are not (only partially) included, check manual for details on possible settings. - https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf - - - delay_burst : (float >=0) Delay between trigger and first pulse in burst mode - - delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition - - additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line) - - polarity : (list of 0/1) polarity for different channels - - amplitude : (float) amplitude voltage of TTLs - - offset : (float) offset for ampltitude - - thres_trig_level : (float) threshold of trigger amplitude - - Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg): - - - set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan. - # TODO trigger_width and fixed_ttl could be combined into single list. - - fixed_ttl_width : (list of either 1 or 0), one for each channel. - - trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value. - - set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG. - - premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan). - - set_high_on_stage : (bool) if True, then TTL signal should go high already on stage. - """ - - custom_prepare_cls = DDGSetup - - # Custom signals passed on during the init procedure via BEC - # TODO review whether those should remain here like that - - delay_burst = Component( - bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config" - ) - - delta_width = Component( - bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config" - ) - - additional_triggers = Component( - bec_utils.ConfigSignal, - name="additional_triggers", - kind="config", - config_storage_name="ddg_config", - ) - - polarity = Component( - bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config" - ) - - fixed_ttl_width = Component( - bec_utils.ConfigSignal, - name="fixed_ttl_width", - kind="config", - config_storage_name="ddg_config", - ) - - amplitude = Component( - bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config" - ) - - offset = Component( - bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config" - ) - - thres_trig_level = Component( - bec_utils.ConfigSignal, - name="thres_trig_level", - kind="config", - config_storage_name="ddg_config", - ) - - set_high_on_exposure = Component( - bec_utils.ConfigSignal, - name="set_high_on_exposure", - kind="config", - config_storage_name="ddg_config", - ) - - set_high_on_stage = Component( - bec_utils.ConfigSignal, - name="set_high_on_stage", - kind="config", - config_storage_name="ddg_config", - ) - - set_trigger_source = Component( - bec_utils.ConfigSignal, - name="set_trigger_source", - kind="config", - config_storage_name="ddg_config", - ) - - trigger_width = Component( - bec_utils.ConfigSignal, - name="trigger_width", - kind="config", - config_storage_name="ddg_config", - ) - premove_trigger = Component( - bec_utils.ConfigSignal, - name="premove_trigger", - kind="config", - config_storage_name="ddg_config", - ) - - def __init__( - self, - name: str, - prefix: str = "", - kind: Kind = None, - ddg_config: dict = None, - parent=None, - device_manager=None, - **kwargs, - ): - """ - Args: - prefix (str, optional): Prefix of the device. Defaults to "". - name (str): Name of the device. - kind (str, optional): Kind of the device. Defaults to None. - read_attrs (list, optional): List of attributes to read. Defaults to None. - configuration_attrs (list, optional): List of attributes to configure. Defaults to None. - parent (Device, optional): Parent device. Defaults to None. - device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None. - sim_mode (bool, optional): Simulation mode flag. Defaults to False. - ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None. - - """ - - # Default values for ddg_config signals - self.ddg_config = { - # Setup default values - f"{name}_delay_burst": 0, - f"{name}_delta_width": 0, - f"{name}_additional_triggers": 0, - f"{name}_polarity": [1, 1, 1, 1, 1], - f"{name}_amplitude": 4.5, - f"{name}_offset": 0, - f"{name}_thres_trig_level": 2.5, - # Values for different behaviour during scans - f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0], - f"{name}_trigger_width": None, - f"{name}_set_high_on_exposure": False, - f"{name}_set_high_on_stage": False, - f"{name}_set_trigger_source": "SINGLE_SHOT", - f"{name}_premove_trigger": False, - } - if ddg_config is not None: - # pylint: disable=expression-not-assigned - [self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()] - super().__init__( - prefix=prefix, - name=name, - kind=kind, - parent=parent, - device_manager=device_manager, - **kwargs, - ) - - -# if __name__ == "__main__": -# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3") diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/__init__.py b/csaxs_bec/devices/epics/delay_generator_csaxs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_master.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_master.py new file mode 100644 index 0000000..b7dd6dd --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_master.py @@ -0,0 +1,106 @@ +from threading import Event, Thread +from time import sleep + +from ophyd import DeviceStatus, StatusBase +from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import DelayGeneratorCSAXS, ChannelConfig, AllChannelNames, OUTPUTPOLARITY, TRIGGERSOURCE, StatusBitsCompareStatus, STATUSBITS, CHANNELREFERENCE +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from bec_lib.logger import bec_logger +import atexit + +logger = bec_logger.logger + +_DEFAULT_CHANNEL_CONFIG: ChannelConfig = { + "amplitude":5.0, + "offset":0.0, + "polarity": OUTPUTPOLARITY.POSITIVE, + "mode":"ttl", + } + +DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { + "t0":_DEFAULT_CHANNEL_CONFIG, + "ab":_DEFAULT_CHANNEL_CONFIG, + "cd":_DEFAULT_CHANNEL_CONFIG, + "ef":_DEFAULT_CHANNEL_CONFIG, + "gh":_DEFAULT_CHANNEL_CONFIG, +} +DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT +DEFAULT_DEAD_TIMES_S = { + "ab":1e-3, + "cd":1e-3, + "ef":1e-3, + "gh":1e-3, +} + + +class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS): + """ + Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1 + """ + + def on_connected(self) -> None: + """Set the default values on the device - intended to overwrite everything to a usable default state. + Sets DEFAULT_IO_CONFIG into each channel, + sets the trigger source to DEFAULT_TRIGGER_SOURCE, + and turns off burst mode""" + self.burst_disable() # it is possible to miss setting settings if burst is enabled + for channel, config in DEFAULT_IO_CONFIG.items(): + self.set_io_values(channel, **config) + self.set_trigger(DEFAULT_TRIGGER_SOURCE) + self.set_references_for_channels( + [("A", CHANNELREFERENCE.T0), + ("B", CHANNELREFERENCE.A), + ("C", CHANNELREFERENCE.T0), + ("D", CHANNELREFERENCE.C), + ("E", CHANNELREFERENCE.T0), + ("F", CHANNELREFERENCE.E), + ("G", CHANNELREFERENCE.T0), + ("H", CHANNELREFERENCE.G)] + ) + # Start background thread to poll status register + self._status_polling_stop_event = Event() + self._status_polling_thread = Thread(target=self._poll_status) # TODO if exit is called, ca_context from pyepics seems unset.. Hook to kill thread? + self._status_polling_thread.start() + atexit.register(self.on_destroy) + + def _poll_status(self): + """The status register has to be actively pulled, triggering the proc_status results in + event_status being updated, which in turn allows the StatusBitsCompareStatus from on_trigger + to be updated and eventually resolve.""" + dispatcher = self.state.proc_status.cl.get_dispatcher() + event = dispatcher.stop_event + while not (self._status_polling_stop_event.is_set() or event.is_set()): + self.state.proc_status.put(1) + sleep(1/5) # poll the status at 5 Hz + + + def on_stage(self) -> DeviceStatus | StatusBase | None: + exp_time = self.scan_info.msg.scan_parameters["exp_time"] + frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"] + readout_time = self.scan_info.msg.scan_parameters["readout_time"] + + if readout_time is not None and readout_time != 0: + # If we are given a single readout time from BEC, use it for all 4 channels + pulse_widths = [exp_time - readout_time]*len(DEFAULT_DEAD_TIMES_S) + else: + # Otherwise, derive the pulse widths from the default dead times defined above + pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S.keys()] + logger.info(f"setting pulse widths to {pulse_widths}") + self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths) + self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time) + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + st = StatusBitsCompareStatus(self.state.event_status, STATUSBITS.END_OF_BURST, run=False) + self.cancel_on_stop(st) + self.trigger_shot.put(1) + return st + + def on_destroy(self) -> None: + if getattr(self, "_status_polling_stop_event", None) is not None: + self._status_polling_stop_event.set() + if getattr(self, "_status_polling_thread", None) is not None: + self._status_polling_thread.join(timeout=3) + +if __name__ == "__main__": + ddg = DDGMaster(name="ddg", prefix="X12SA-CPCL-DDG1:") + ddg.wait_for_connection(all_signals=True, timeout=30) + ddg.summary() diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py new file mode 100644 index 0000000..1910c4c --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py @@ -0,0 +1,617 @@ +""" +Delay generator implementation for CSAXS. + +Detailed information can be found in the manual: +https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf +""" + +import enum +from typing import Literal, TypedDict + +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal +from ophyd_devices import StatusBase, SubscriptionStatus +from typeguard import typechecked +from bec_lib.logger import bec_logger + +logger = bec_logger.logger +DelayChannelNames = Literal["ab", "cd", "ef", "gh"] +AllChannelNames = Literal["t0", "ab", "cd", "ef", "gh"] +LiteralChannels = Literal["A","B","C","D","E","F","G","H"] + +class CHANNELREFERENCE(enum.Enum): + T0 = 0 + A = 1 + B = 2 + C = 3 + D = 4 + E = 5 + F = 6 + G = 7 + H = 8 + +class BURSTCONFIG(enum.Enum): + """Enum option for burst_config signal of the delay generator. + + ALL_CYCLES: T0 triggere for all cycles. + FIRST_CYCLE: T0 only triggered for the first cycle. + """ + + ALL_CYCLES = 0 + FIRST_CYCLE = 1 + + +class TRIGGERSOURCE(enum.Enum): + """Enum options for the trigger_source signal of the delay generator.""" + + INTERNAL = 0 + EXT_RISING_EDGE = 1 + EXT_FALLING_EDGE = 2 + SS_EXT_RISING_EDGE = 3 + SS_EXT_FALLING_EDGE = 4 + SINGLE_SHOT = 5 + LINE = 6 + + +class TRIGGERINHIBIT(enum.Enum): + """Enum options for the trigger_inhibit signal of the delay generator.""" + + OFF = 0 + TRIGGERS = 1 + AB = 2 + AB_CD = 3 + AB_CD_EF = 4 + AB_CD_EF_GH = 5 + + +class OUTPUTPOLARITY(enum.Enum): + """Enum options for the polarity signal of the static pair.""" + + NEGATIVE = 0 + POSITIVE = 1 + + +class STATUSBITS(enum.IntFlag): + """Bit flags for the status signal of the delay generator.""" + + TRIG = 1 << 0 # Got a trigger. + RATE = 1 << 1 # Got a trigger while a delay or burst was in progress. + END_OF_DELAY = 1 << 2 # A delay cycle has completed. + END_OF_BURST = 1 << 3 # A burst cycle has completed. + INHIBIT = 1 << 4 # A trigger or output delay cycle was inhibited. + ABORT_DELAY = 1 << 5 # A delay cycle was aborted early. + PLL_UNLOCK = 1 << 6 # The 100 MHz PLL came unlocked. + RB_UNLOCK = 1 << 7 # The installed Rb oscillator is unlocked. + + def describe(self) -> dict: + """Return a description of the status bits.""" + descriptions = { + STATUSBITS.TRIG: "Got a trigger.", + STATUSBITS.RATE: "Got a trigger while a delay or burst was in progress.", + STATUSBITS.END_OF_DELAY: "A delay cycle has completed.", + STATUSBITS.END_OF_BURST: "A burst cycle has completed.", + STATUSBITS.INHIBIT: "A trigger or output delay cycle was inhibited.", + STATUSBITS.ABORT_DELAY: "A delay cycle was aborted early.", + STATUSBITS.PLL_UNLOCK: "The 100 MHz PLL came unlocked.", + STATUSBITS.RB_UNLOCK: "The installed Rb oscillator is unlocked.", + } + return {flag.name: descriptions[flag] for flag in STATUSBITS if flag in self} + + +class StatusBitsCompareStatus(SubscriptionStatus): + """Compare status for STATUSBITS comparison.""" + + def __init__( + self, + signal: EpicsSignalRO, + value: STATUSBITS, + *args, + event_type=None, + timeout: float|None = None, + settle_time: float = 0, + run: bool = True, + **kwargs, + ): + """Initialize the compare status with a signal.""" + self._signal = signal + self._value = value + super().__init__( + device=signal, + callback=self._compare_callback, + timeout=timeout, + settle_time=settle_time, + event_type=event_type, + run=run, + ) + + def _compare_callback(self, value, **kwargs) -> bool: + """Callback for subscription status""" + return (STATUSBITS(value) & self._value) == self._value + + + +class ChannelConfig(TypedDict): + amplitude: float | None + offset: float | None + polarity: OUTPUTPOLARITY | Literal[0, 1] | None + mode: Literal["ttl", "nim"] | None + +class StaticPair(Device): + """ + Class to represent a static pair. + It allows setting the logic levels, but the timing is fixed. + The signal is high after receiving the trigger until the end of the holdoff period. + """ + + ttl_mode = Cpt( + EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.omitted, auto_monitor=True + ) + nim_mode = Cpt( + EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.omitted, auto_monitor=True + ) + polarity = Cpt( + EpicsSignal, + "OutputPolarityBI", + write_pv="OutputPolarityBO", + name="polarity", + kind=Kind.omitted, + auto_monitor=True, + ) + + amplitude = Cpt( + EpicsSignal, + "OutputAmpAI", + write_pv="OutputAmpAO", + name="amplitude", + kind=Kind.omitted, + auto_monitor=True, + ) + + offset = Cpt( + EpicsSignal, + "OutputOffsetAI", + write_pv="OutputOffsetAO", + name="offset", + kind=Kind.omitted, + auto_monitor=True, + ) + + +class Channel(Device): + """ + Represents a single channel A, B, etc. of the delay generator. + """ + + setpoint = Cpt( + EpicsSignal, + write_pv="DelayAO", + read_pv="DelayAI", + put_complete=True, + auto_monitor=True, + kind=Kind.omitted, + doc="Value for the channel", + ) + reference = Cpt( + EpicsSignal, + "ReferenceMO", + put_complete=True, + kind=Kind.omitted, + auto_monitor=True, + doc="Reference channel for the channel", # Check defaults, possible should be T0,A,B,... + ) + + def __init__(self, *args, **kwargs): + """ + Initialize the channel with a setpoint and reference signal. + """ + # The read PV in EpicsSignal does not receive the prefix.. so we need to add it manually. + self.__class__.__dict__["setpoint"].kwargs["read_pv"] = args[0] + "DelayAI" + super().__init__(*args, **kwargs) + + +class WidthSignal(Signal): + """A signal that represents the width of a channel.""" + + def get(self, **kwargs) -> float: + """ + Get the width of the channel. + + Returns: + float: The width of the channel in seconds. + """ + parent: _DelayPairBase = self._parent # type: ignore + return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore + + def check_value(self, value: float) -> float: + """Check if the value is larger equal to 0""" + if value >= 0: + return value + else: + raise ValueError(f"Width must be larger ot equal 0, got {value} seconds.") + + def put(self, value: float, **kwargs): + """ + Set the width of the channel. + + Args: + value (float): The width to set in seconds. + """ + self.check_value(value) + parent: _DelayPairBase = self._parent # type: ignore + ch1_setpoint: float = parent.ch1.setpoint.get()# type: ignore + parent.ch2.setpoint.put(ch1_setpoint + value, **kwargs) + + def set(self, value: float, **kwargs): + """ + Set the width of the channel. + + Args: + value (float): The width to set in seconds. + """ + status = StatusBase() + self.put(value, **kwargs) + status.set_finished() + return status + + +class DelaySignal(Signal): + """A signal that represents the delay of a channel.""" + + def get(self, **kwargs): + """ + Get the delay of the channel. + + Returns: + float: The delay of the channel in seconds. + """ + parent: _DelayPairBase = self._parent # type: ignore + return parent.ch1.setpoint.get() + + def put(self, value: float, **kwargs): + """ + Set the delay of the channel. + + Args: + value (float): The delay to set in seconds. + """ + parent: _DelayPairBase = self._parent # type: ignore + parent.ch1.setpoint.put(value, **kwargs) + parent.ch2.setpoint.put(value + parent.width.get(), **kwargs) + + def set(self, value: float, **kwargs): + """ + Set the width of the channel. + + Args: + value (float): The width to set in seconds. + """ + status = StatusBase() + self.put(value, **kwargs) + status.set_finished() + return status + +class _DelayPairBase(Device): + """ Base class for delay pairs. Children have to implement ch1,ch2 for + the respective delay channels. The class attributes have to be called + ch1, ch2 for width and delay signals to work.""" + + ch1: Cpt[Channel] + ch2: Cpt[Channel] + io: Cpt[StaticPair] + width = Cpt(WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse") + delay = Cpt(DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse") + +class DelayPairAB(_DelayPairBase): + + ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted) + ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted) + io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted) + +class DelayPairCD(_DelayPairBase): + + ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted) + ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted) + io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted) + +class DelayPairEF(_DelayPairBase): + + ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted) + ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted) + io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted) + +class DelayPairGH(_DelayPairBase): + + ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted) + ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted) + io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted) + + +class DelayGeneratorEventStatus(Device): + """Subdevice to represent the event state of the delay generator.""" + + event_status = Cpt( + EpicsSignalRO, "EventStatusLI", name="event_status", kind=Kind.omitted, auto_monitor=True + ) + proc_status = Cpt( + EpicsSignal, "EventStatusLI.PROC", name="proc_status", kind=Kind.omitted + ) + + +class DelayGeneratorCSAXS(Device): + """ + Delay Generator Stanford Research DG645. This implements an interface for the DG645 delay generator. + + Detailed information can be found in the manual: + https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf + + The DG645 has 8 channels, each with a delay and pulse width. The channels are implemented as DelayPair objects (AB etc.). + + Each pair has a TTL pulse width, delay and a reference signal to which they are being triggered. + In addition, the io layer allows setting amplitude, offset and polarity for each pair. + """ + + _pv_timeout: float = 1.5 # Default timeout for PV operations in seconds + + # Front Panel + t0 = Cpt(StaticPair, "T0", name="t0") + ab = Cpt(DelayPairAB, "", name="ab") + cd = Cpt(DelayPairCD, "", name="cd") + ef = Cpt(DelayPairEF, "", name="ef") + gh = Cpt(DelayPairGH, "", name="gh") + state = Cpt(DelayGeneratorEventStatus, "", name="state") + status_msg = Cpt( + EpicsSignalRO, "StatusSI", name="status_msg", kind=Kind.omitted, auto_monitor=True + ) + status_msg_clear = Cpt( + EpicsSignal, "StatusClearBO", name="status_msg_clear", kind=Kind.omitted + ) + + trigger_holdoff = Cpt( + EpicsSignal, + "TriggerHoldoffAI", + write_pv="TriggerHoldoffAO", + name="trigger_holdoff", + kind=Kind.config, + ) + trigger_inhibit = Cpt( + EpicsSignal, + "TriggerInhibitMI", + write_pv="TriggerInhibitMO", + name="trigger_inhibit", + kind=Kind.omitted, + ) + trigger_source = Cpt( + EpicsSignal, + "TriggerSourceMI", + write_pv="TriggerSourceMO", + name="trigger_source", + kind=Kind.omitted, + doc="Trigger Source for the DDG, options in TRIGGERSOURCE" + ) + trigger_level = Cpt( + EpicsSignal, + "TriggerLevelAI", + write_pv="TriggerLevelAO", + name="trigger_level", + kind=Kind.omitted, + ) + trigger_rate = Cpt( + EpicsSignal, + "TriggerRateAI", + write_pv="TriggerRateAO", + name="trigger_rate", + kind=Kind.omitted, + ) + trigger_shot = Cpt( + EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind=Kind.omitted + ) + burst_mode = Cpt( + EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burst_mode", kind=Kind.omitted + ) + burst_config = Cpt( + EpicsSignal, + "BurstConfigBI", + write_pv="BurstConfigBO", + name="burst_config", + kind=Kind.omitted, + ) + burst_count = Cpt( + EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burst_count", kind=Kind.omitted + ) + burst_delay = Cpt( + EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burst_delay", kind=Kind.omitted + ) + burst_period = Cpt( + EpicsSignal, + "BurstPeriodAI", + write_pv="BurstPeriodAO", + name="burst_period", + kind=Kind.omitted, + ) + + def proc_event_status(self) -> None: + """The reading must be manually triggered to update the event status.""" + self.state.proc_status.put(1) + + def wait_for_event_status( + self, value: STATUSBITS, timeout: float | None = None + ) -> StatusBitsCompareStatus: + """ + Wait for a specific event status. + + Args: + value (STATUSBITS): The status bits to wait for. + timeout (float): The maximum time to wait in seconds. + """ + return StatusBitsCompareStatus( + signal=self.state.event_status, value=value, timeout=timeout, run=True + ) + + def set_trigger(self, source: TRIGGERSOURCE | int) -> None: + """ + Set the trigger source. + + Args: + source (TriggerSource | int): The trigger source + INTERNAL = 0 + EXT_RISING_EDGE = 1 + EXT_FALLING_EDGE = 2 + SS_EXT_RISING_EDGE = 3 + SS_EXT_FALLING_EDGE = 4 + SINGLE_SHOT = 5 + LINE = 6 + """ + if isinstance(source, TRIGGERSOURCE): + self.trigger_source.set(source.value).wait(self._pv_timeout) + else: + self.trigger_source.set(int(source)).wait(self._pv_timeout) + + @typechecked + def burst_enable( + self, + count: int, + delay: float, + period: float, + config: Literal["all", "first"] | BURSTCONFIG = "first", + ) -> None: + """Enable burst mode with valid parameters. + + Args: + count (int): Number of bursts >0 + delay (float): Delay before bursts start in seconds >=0 + period (float): Period of the bursts in seconds >0 + config (str): Configuration of T0 duiring burst. + In addition, to simplify triggering of other instruments synchronously with the burst, + the T0 output may be configured to fire on the first delay cycle of the burst, + rather than for all delay cycles as is normally the case. BURSTCONFIG + """ + + # Check inputs first + if count <= 0: + raise ValueError(f"Count must be >0, provided: {count}") + if delay < 0: + raise ValueError(f"Delay must be >=0, provided: {delay}") + if period <= 0: + raise ValueError(f"Period must be >0, provided: {period}") + + self.burst_mode.set(1).wait(timeout=self._pv_timeout) + self.burst_count.set(count).wait(timeout=self._pv_timeout) + self.burst_delay.set(delay).wait(timeout=self._pv_timeout) + self.burst_period.set(period).wait(timeout=self._pv_timeout) + + if config == "all": + self.burst_config.set(BURSTCONFIG.ALL_CYCLES.value).wait(timeout=self._pv_timeout) + elif config == "first": + self.burst_config.set(BURSTCONFIG.FIRST_CYCLE.value).wait(timeout=self._pv_timeout) + + def burst_disable(self) -> None: + """Disable burst mode""" + self.burst_mode.set(0).wait(timeout=self._pv_timeout) + + @typechecked + def set_io_values( + self, + channel: ( + AllChannelNames | list[AllChannelNames] + ), + amplitude: float | None = None, + offset: float | None = None, + polarity: OUTPUTPOLARITY | Literal[0, 1] | None = None, + mode: Literal["ttl", "nim"] | None = None, + ) -> None: + """Set the IO values for the static pair. + + Args: + channel (str | list[str]): Channel(s) to set the IO values for. + Can be "t0", "ab", "cd", "ef", "gh" or a list of these. + If a list is provided, the same values will be set for all channels. + amplitude (float): Amplitude of the output signal in volts. + offset (float): Offset of the output signal in volts. + polarity (OUTPUTPOLARITY | int): Polarity of the output signal. + ttl_mode (bool): If True, set the output to TTL mode. + nim_mode (bool): If True, set the output to NIM mode. + If both ttl_mode and nim_mode are set to True, + a ValueError is raised. + """ + if isinstance(channel, str): + channel = [channel] + for ch in channel: + if ch == "t0": + io_channel = self.t0 + else: + io_channel = getattr(getattr(self, ch), "io") + if amplitude is not None: + io_channel.amplitude.set(amplitude).wait(timeout=self._pv_timeout) + if offset is not None: + io_channel.offset.set(offset).wait(timeout=self._pv_timeout) + if polarity is not None: + if isinstance(polarity, OUTPUTPOLARITY): + io_channel.polarity.set(polarity.value).wait(timeout=self._pv_timeout) + else: + io_channel.polarity.set(int(polarity)).wait(timeout=self._pv_timeout) + if mode == "ttl": + io_channel.ttl_mode.set(1).wait(timeout=self._pv_timeout) + if mode == "nim": + io_channel.nim_mode.set(1).wait(timeout=self._pv_timeout) + + def set_delay_pairs( + self, + channel: DelayChannelNames | list[DelayChannelNames], + delay: float | list[float] | None = None, + width: float | list[float] | None = None, + ) -> None: + """Set the delay and width for a specific channel pair. + + Args: + channel (str): Channel pair to set the delay and width for. + Can be "ab", "cd", "ef", "gh". + delay (float): Delay in seconds to set for the channel pair. + width (float): Width in seconds to set for the channel pair. + """ + if isinstance(channel, str): + channel = [channel] + if isinstance(delay, (float, int)): + delay = [float(delay)] * len(channel) + if isinstance(width, (float,int)): + width = [float(width)] * len(channel) + if delay is not None: + if len(delay) != len(channel): + raise ValueError( + f"Length of delay {len(delay)} must match length of channel {len(channel)}." + ) + for ii, ch in enumerate(channel): + delay_channel=getattr(self, ch) + delay_channel.delay.put(delay[ii]) + if width is not None: + if len(width) != len(channel): + raise ValueError( + f"Length of width {len(width)} must match length of channel {len(channel)}." + ) + logger.info(f"setting widths of channels {channel} to {width}") + for ii, ch in enumerate(channel): + delay_channel= getattr(self, ch) + delay_channel.width.put(width[ii]) + + def _get_literal_channel(self, channel: LiteralChannels) -> Channel: + return { + "A": self.ab.ch1, + "B": self.ab.ch2, + "C": self.cd.ch1, + "D": self.cd.ch2, + "E": self.ef.ch1, + "F": self.ef.ch2, + "G": self.gh.ch1, + "H": self.gh.ch2, + }[channel] + + def set_channel_reference(self, channel: LiteralChannels, reference_channel: CHANNELREFERENCE): + self._get_literal_channel(channel).reference.put(reference_channel.value) + + def set_references_for_channels(self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]): + for ch, ref in channels_and_refs: + self.set_channel_reference(ch, ref) + +if __name__ == "__main__": + ddg = DelayGeneratorCSAXS(name="ddg", prefix="X12SA-CPCL-DDG1:") + ddg.wait_for_connection(all_signals=True, timeout=30) + ddg.summary() diff --git a/tests/tests_devices/test_delay_generator_csaxs.py b/tests/tests_devices/test_delay_generator_csaxs.py index 10005f1..b36e998 100644 --- a/tests/tests_devices/test_delay_generator_csaxs.py +++ b/tests/tests_devices/test_delay_generator_csaxs.py @@ -1,276 +1,128 @@ # pylint: skip-file +import threading +from typing import Generator from unittest import mock +import numpy as np +import ophyd import pytest -from ophyd_devices.devices.delay_generator_645 import TriggerSource +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs -from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup +from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( + BURSTCONFIG, + STATUSBITS, + TRIGGERSOURCE, + DelayGeneratorCSAXS, +) @pytest.fixture(scope="function") -def mock_DDGSetup(): - mock_ddg = mock.MagicMock() - yield DDGSetup(parent=mock_ddg) +def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]: + """Fixture to mock the camera device.""" + name = "ddg" + prefix = "test:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = DelayGeneratorCSAXS(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev -# Fixture for scaninfo -@pytest.fixture( - params=[ - { - "scan_id": "1234", - "scan_type": "step", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 0.1, - "readout_time": 0.1, - }, - { - "scan_id": "1234", - "scan_type": "step", - "num_points": 500, - "frames_per_trigger": 5, - "exp_time": 0.01, - "readout_time": 0, - }, - { - "scan_id": "1234", - "scan_type": "fly", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 1, - "readout_time": 0.2, - }, - { - "scan_id": "1234", - "scan_type": "fly", - "num_points": 500, - "frames_per_trigger": 5, - "exp_time": 0.1, - "readout_time": 0.4, - }, - ] -) -def scaninfo(request): - return request.param +def test_ddg_init(mock_ddg): + """Test the proc event status method.""" + assert mock_ddg.name == "ddg" + assert mock_ddg.prefix == "test:" -# Fixture for DDG config default values -@pytest.fixture( - params=[ - { - "delay_burst": 0.0, - "delta_width": 0.0, - "additional_triggers": 0, - "polarity": [0, 0, 0, 0, 0], - "amplitude": 0.0, - "offset": 0.0, - "thres_trig_level": 0.0, - }, - { - "delay_burst": 0.1, - "delta_width": 0.1, - "additional_triggers": 1, - "polarity": [0, 0, 1, 0, 0], - "amplitude": 5, - "offset": 0.0, - "thres_trig_level": 2.5, - }, - ] -) -def ddg_config_defaults(request): - return request.param +def test_ddg_proc_event_status(mock_ddg): + """Test the proc event status method.""" + mock_ddg.state.proc_status.put(0) + mock_ddg.proc_event_status() + assert mock_ddg.state.proc_status.get() == 1 -# Fixture for DDG config scan values -@pytest.fixture( - params=[ - { - "fixed_ttl_width": [0, 0, 0, 0, 0], - "trigger_width": None, - "set_high_on_exposure": False, - "set_high_on_stage": False, - "set_trigger_source": "SINGLE_SHOT", - "premove_trigger": False, - }, - { - "fixed_ttl_width": [0, 0, 0, 0, 0], - "trigger_width": 0.1, - "set_high_on_exposure": True, - "set_high_on_stage": False, - "set_trigger_source": "SINGLE_SHOT", - "premove_trigger": True, - }, - { - "fixed_ttl_width": [0, 0, 0, 0, 0], - "trigger_width": 0.1, - "set_high_on_exposure": False, - "set_high_on_stage": False, - "set_trigger_source": "EXT_RISING_EDGE", - "premove_trigger": False, - }, - ] -) -def ddg_config_scan(request): - return request.param +def test_ddg_set_trigger(mock_ddg): + """Test setting the trigger.""" + for trigger in TRIGGERSOURCE: + mock_ddg.set_trigger(trigger) + assert mock_ddg.trigger_source.get() == trigger.value -# Fixture for delay pairs -@pytest.fixture( - params=[ - {"all_channels": ["channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]}, - {"all_channels": [], "all_delay_pairs": []}, - {"all_channels": ["channelT0", "channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]}, - ] -) -def channel_pairs(request): - return request.param +def test_ddg_burst_enable(mock_ddg): + """Test enabling burst mode.""" + mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) + mock_ddg.burst_mode.get() == 1 + assert mock_ddg.burst_count.get() == 100 + assert mock_ddg.burst_delay.get() == 0.1 + assert mock_ddg.burst_period.get() == 0.02 + assert mock_ddg.burst_config.get() == BURSTCONFIG.ALL_CYCLES.value + assert mock_ddg.burst_mode.get() == 1 + # Count is 0 + with pytest.raises(ValueError): + mock_ddg.burst_enable(count=0, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) + # delay is negative + with pytest.raises(ValueError): + mock_ddg.burst_enable(count=100, delay=-0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) + # period is zero + with pytest.raises(ValueError): + mock_ddg.burst_enable(count=100, delay=0.1, period=0, config=BURSTCONFIG.ALL_CYCLES) + + # Works with default config + mock_ddg.burst_enable(count=100, delay=0.1, period=0.02) + mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value -def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): - """Test the check_scan_id method.""" - # Set first attributes of parent class - for k, v in scaninfo.items(): - setattr(mock_DDGSetup.parent.scaninfo, k, v) - for k, v in ddg_config_defaults.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - for k, v in ddg_config_scan.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - # Call the function you want to test - mock_DDGSetup.on_pre_scan() - if ddg_config_scan["premove_trigger"]: - mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1) +def test_ddg_wait_for_event_status(mock_ddg): + """Test setting wait for event status.""" + mock_ddg: DelayGeneratorCSAXS + mock_ddg.state.event_status._read_pv.mock_data = 0 + status = mock_ddg.wait_for_event_status(value=STATUSBITS.END_OF_BURST) # 8 + assert status.done is False + mock_ddg.state.event_status._read_pv.mock_data = 1 + assert status.done is False + mock_ddg.state.event_status._read_pv.mock_data = 4 + assert status.done is False + # TODO enable once callback for MockPV is implemented + # mock_ddg.state.event_status._read_pv.mock_data = 13 # 8 + 4 + 1 + # status.wait(timeout=1) # Wait for the status to be done + # assert status.done is True -# TODO put back once the logic is implemented -# @pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"]) -# def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source): -# """Test the on_trigger method.""" -# # Set first attributes of parent class -# for k, v in scaninfo.items(): -# setattr(mock_DDGSetup.parent.scaninfo, k, v) -# for k, v in ddg_config_defaults.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# for k, v in ddg_config_scan.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# # Call the function you want to test -# mock_DDGSetup.parent.source.name = "source" -# mock_DDGSetup.parent.source.read.return_value = { -# mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)} -# } -# mock_DDGSetup.on_trigger() -# if source == "SINGLE_SHOT": -# mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1) +def test_ddg_set_io_values(mock_ddg): + """Test setting IO values.""" + mock_ddg.set_io_values(channel="AB", amplitude=3, offset=2, polarity=1, mode="ttl") + assert mock_ddg.AB.io.amplitude.get() == 3 + assert mock_ddg.AB.io.offset.get() == 2 + assert mock_ddg.AB.io.polarity.get() == 1 + assert mock_ddg.AB.io.ttl_mode.get() == 1 + # List of channels + channels = ["AB", "CD", "T0"] + mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim") + for channel in channels: + if channel == "T0": + attr = getattr(mock_ddg, channel) + else: + attr = getattr(mock_ddg, channel).io + assert attr.amplitude.get() == 3 + assert attr.offset.get() == 2 + assert attr.polarity.get() == 1 + assert attr.nim_mode.get() == 1 -def test_on_wait_for_connection( - mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs -): - """Test the initialize_default_parameter method.""" - # Set first attributes of parent class - for k, v in scaninfo.items(): - setattr(mock_DDGSetup.parent.scaninfo, k, v) - for k, v in ddg_config_defaults.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - for k, v in ddg_config_scan.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - # Call the function you want to test - mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] - mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] - calls = [] - calls.extend( - [ - mock.call("polarity", ddg_config_defaults["polarity"][ii], [channel]) - for ii, channel in enumerate(channel_pairs["all_channels"]) - ] - ) - calls.extend([mock.call("amplitude", ddg_config_defaults["amplitude"])]) - calls.extend([mock.call("offset", ddg_config_defaults["offset"])]) - calls.extend( - [ - mock.call( - "reference", 0, [f"channel{pair}.ch1" for pair in channel_pairs["all_delay_pairs"]] - ) - ] - ) - calls.extend( - [ - mock.call( - "reference", 0, [f"channel{pair}.ch2" for pair in channel_pairs["all_delay_pairs"]] - ) - ] - ) - mock_DDGSetup.on_wait_for_connection() - mock_DDGSetup.parent.set_channels.assert_has_calls(calls) - - -# TODO put back once the logic is implemented -# def test_on_stage(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs): -# """Test the prepare_ddg method.""" -# # Set first attributes of parent class -# for k, v in scaninfo.items(): -# setattr(mock_DDGSetup.parent.scaninfo, k, v) -# for k, v in ddg_config_defaults.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# for k, v in ddg_config_scan.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# # Call the function you want to test -# mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] -# mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] - -# mock_DDGSetup.prepare_ddg() -# mock_DDGSetup.parent.set_trigger.assert_called_once_with( -# getattr(TriggerSource, ddg_config_scan["set_trigger_source"]) -# ) -# if scaninfo["scan_type"] == "step": -# if ddg_config_scan["set_high_on_exposure"]: -# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] -# exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * ( -# scaninfo["exp_time"] + scaninfo["readout_time"] -# ) -# total_exposure = exp_time -# delay_burst = ddg_config_defaults["delay_burst"] -# else: -# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] -# total_exposure = exp_time + scaninfo["readout_time"] -# delay_burst = ddg_config_defaults["delay_burst"] -# num_burst_cycle = ( -# scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"] -# ) -# elif scaninfo["scan_type"] == "fly": -# if ddg_config_scan["set_high_on_exposure"]: -# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] -# exp_time = ( -# ddg_config_defaults["delta_width"] -# + scaninfo["num_points"] * scaninfo["exp_time"] -# + (scaninfo["num_points"] - 1) * scaninfo["readout_time"] -# ) -# total_exposure = exp_time -# delay_burst = ddg_config_defaults["delay_burst"] -# else: -# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] -# total_exposure = exp_time + scaninfo["readout_time"] -# delay_burst = ddg_config_defaults["delay_burst"] -# num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"] - -# # mock_DDGSetup.parent.burst_enable.assert_called_once_with( -# # mock.call(num_burst_cycle, delay_burst, total_exposure, config="first") -# # ) -# mock_DDGSetup.parent.burst_enable.assert_called_once_with( -# num_burst_cycle, delay_burst, total_exposure, config="first" -# ) -# if not ddg_config_scan["trigger_width"]: -# call = mock.call("width", exp_time) -# assert call in mock_DDGSetup.parent.set_channels.mock_calls -# else: -# call = mock.call("width", ddg_config_scan["trigger_width"]) -# assert call in mock_DDGSetup.parent.set_channels.mock_calls -# if ddg_config_scan["set_high_on_exposure"]: -# calls = [ -# mock.call("width", value, channels=[channel]) -# for value, channel in zip( -# ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"] -# ) -# if value != 0 -# ] -# if calls: -# assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls) +def test_ddg_set_delay_pairs(mock_ddg): + """Test setting delay pairs.""" + mock_ddg.set_delay_pairs(channel="AB", delay=0.1, width=0.2) + assert np.isclose(mock_ddg.AB.delay.get(), 0.1) + assert np.isclose(mock_ddg.AB.width.get(), 0.2) + assert np.isclose(mock_ddg.AB.ch1.setpoint.get(), 0.1) + assert np.isclose(mock_ddg.AB.ch2.setpoint.get(), 0.3) + # List of channels + channels = ["AB", "CD", "EF", "GH"] + delays = [0.1, 0.2, 0.4, 0.5] + mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2) + for delay, channel in zip(delays, channels): + assert np.isclose(getattr(mock_ddg, channel).delay.get(), delay) + assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2) + assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay) + assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2)