diff --git a/csaxs_bec/device_configs/bec_device_config_sastt.yaml b/csaxs_bec/device_configs/bec_device_config_sastt.yaml index 142794f..11784f1 100644 --- a/csaxs_bec/device_configs/bec_device_config_sastt.yaml +++ b/csaxs_bec/device_configs/bec_device_config_sastt.yaml @@ -53,89 +53,6 @@ eiger9m: enabled: true readoutPriority: async softwareTrigger: false -ddg_detectors: - description: DelayGenerator for detector triggering - deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS - deviceConfig: - prefix: 'delaygen:DG1:' - ddg_config: - delay_burst: 40.e-3 - delta_width: 0 - additional_triggers: 0 - polarity: - - 1 # T0 -> DDG MCS - - 0 # eiger - - 1 # falcon - - 1 - - 1 - amplitude: 4.5 - offset: 0 - thres_trig_level: 2.5 - set_high_on_exposure: False - set_high_on_stage: False - deviceTags: - - cSAXS - - ddg_detectors - onFailure: buffer - enabled: true - readoutPriority: async - softwareTrigger: false -ddg_mcs: - description: DelayGenerator for mcs triggering - deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS - deviceConfig: - prefix: 'delaygen:DG2:' - ddg_config: - delay_burst: 0 - delta_width: 0 - additional_triggers: 1 - polarity: - - 1 - - 0 - - 1 - - 1 - - 1 - amplitude: 4.5 - offset: 0 - thres_trig_level: 2.5 - set_high_on_exposure: False - set_high_on_stage: False - set_trigger_source: EXT_RISING_EDGE - trigger_width: 3.e-3 - deviceTags: - - cSAXS - - ddg_mcs - onFailure: buffer - enabled: true - readoutPriority: async - softwareTrigger: false -ddg_fsh: - description: DelayGenerator for fast shutter control - deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS - deviceConfig: - prefix: 'delaygen:DG3:' - ddg_config: - delay_burst: 0 - delta_width: 80.e-3 - additional_triggers: 0 - polarity: - - 1 - - 1 - - 1 - - 1 - - 1 - amplitude: 4.5 - offset: 0 - thres_trig_level: 2.5 - set_high_on_exposure: True - set_high_on_stage: False - deviceTags: - - cSAXS - - ddg_fsh - onFailure: buffer - enabled: true - readoutPriority: async - softwareTrigger: false falcon: description: Falcon detector x-ray fluoresence deviceClass: csaxs_bec.devices.epics.falcon_csaxs.FalconcSAXS diff --git a/csaxs_bec/device_configs/ddg_test.yaml b/csaxs_bec/device_configs/ddg_test.yaml new file mode 100644 index 0000000..d4748b1 --- /dev/null +++ b/csaxs_bec/device_configs/ddg_test.yaml @@ -0,0 +1,45 @@ +ddg1: + description: Main delay Generator for triggering + deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG1 + enabled: true + deviceConfig: + prefix: 'X12SA-CPCL-DDG1:' + onFailure: raise + readOnly: false + readoutPriority: baseline + softwareTrigger: true + +ddg2: + description: Detector delay Generator for trigger burst + deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG2 + enabled: true + deviceConfig: + prefix: 'X12SA-CPCL-DDG2:' + onFailure: raise + readOnly: false + readoutPriority: baseline + softwareTrigger: false + +samx: + readoutPriority: baseline + deviceClass: ophyd_devices.SimPositioner + deviceConfig: + delay: 1 + limits: + - -50 + - 50 + tolerance: 0.01 + update_frequency: 400 + deviceTags: + - user motors + enabled: true + readOnly: false + +bpm4i: + readoutPriority: monitored + deviceClass: ophyd_devices.SimMonitor + deviceConfig: + deviceTags: + - beamline + enabled: true + readOnly: false \ No newline at end of file diff --git a/csaxs_bec/device_configs/epics_devices_config.yaml b/csaxs_bec/device_configs/epics_devices_config.yaml deleted file mode 100644 index 546f085..0000000 --- a/csaxs_bec/device_configs/epics_devices_config.yaml +++ /dev/null @@ -1,50 +0,0 @@ -ddg_detectors: - description: DelayGenerator for detector triggering - deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS - deviceConfig: - prefix: 'X12SA-CPCL-DDG3:' - ddg_config: - delay_burst: 40.e-3 - delta_width: 0 - additional_triggers: 0 - polarity: - - 1 # T0 -> DDG MCS - - 0 # eiger - - 1 # falcon - - 1 - - 1 - amplitude: 4.5 - offset: 0 - thres_trig_level: 2.5 - set_high_on_exposure: False - set_high_on_stage: False - deviceTags: - - cSAXS - - ddg_detectors - onFailure: buffer - enabled: true - readoutPriority: async - softwareTrigger: True -bpm4i: - readoutPriority: monitored - deviceClass: ophyd_devices.SimMonitor - deviceConfig: - deviceTags: - - beamline - enabled: true - readOnly: false -samx: - readoutPriority: baseline - deviceClass: ophyd_devices.SimPositioner - deviceConfig: - delay: 1 - limits: - - -50 - - 50 - tolerance: 0.01 - update_frequency: 400 - deviceTags: - - user motors - enabled: true - readOnly: false - \ No newline at end of file diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs.py deleted file mode 100644 index 1094857..0000000 --- a/csaxs_bec/devices/epics/delay_generator_csaxs.py +++ /dev/null @@ -1,274 +0,0 @@ -from bec_lib import bec_logger -from ophyd import Component, DeviceStatus, Kind -from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource -from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare -from ophyd_devices.sim.sim_signals import SetableSignal -from ophyd_devices.utils import bec_utils - -logger = bec_logger.logger - - -class DelayGeneratorcSAXSError(Exception): - """Exception raised for errors.""" - - -class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]): - """ - Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS - """ - - def on_wait_for_connection(self) -> None: - """Init default parameter after the all signals are connected""" - for ii, channel in enumerate(self.parent.all_channels): - self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel]) - - self.parent.set_channels("amplitude", self.parent.amplitude.get()) - self.parent.set_channels("offset", self.parent.offset.get()) - # Setup reference - self.parent.set_channels( - "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs] - ) - self.parent.set_channels( - "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs] - ) - self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) - # Set threshold level for ext. pulses - self.parent.level.put(self.parent.thres_trig_level.get()) - - def on_stage(self) -> None: - "Hook execute before the scan starts" - if self.parent.scaninfo.scan_type == "step": - exp_time = self.parent.scaninfo.exp_time - delay = 0 - self.parent.burst_disable() - self.parent.set_trigger(TriggerSource.SINGLE_SHOT) - self.parent.set_channels(signal="width", value=exp_time) - self.parent.set_channels(signal="delay", value=delay) - return - scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "jjf_test": - # TODO implement the logic for JJF triggering - exp_time = 480e-6 # self.parent.scaninfo.exp_time - readout = 20e-6 # self.parent.scaninfo.readout_time - total_exposure = exp_time + readout - num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) - delay = 0 - delay_burst = self.parent.delay_burst.get() - - self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) - - self.parent.set_channels(signal="width", value=exp_time) - self.parent.set_channels(signal="delay", value=delay) - self.parent.burst_enable( - count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first" - ) - logger.info( - f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}" - ) - - def on_trigger(self) -> DeviceStatus: - """Method to be executed upon trigger""" - if self.parent.scaninfo.scan_type == "step": - self.parent.trigger_shot.put(1) - return - scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "jjf_test": - exp_time = 480e-6 # self.parent.scaninfo.exp_time - readout = 20e-6 # self.parent.scaninfo.readout_time - total_exposure = exp_time + readout - num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) - - # Start trigger cycle - self.parent.trigger_burst_readout.put(1) - - # Create status object that will wait for the end of the burst cycle - status = self.wait_with_status( - signal_conditions=[(self.parent.burst_cycle_finished, 1)], - timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure - check_stopped=True, - exception_on_timeout=DelayGeneratorcSAXSError( - f"{self.parent.name} run into timeout in complete call." - ), - ) - logger.info(f"Return status {self.parent.name}") - return status - - def on_complete(self) -> DeviceStatus: - pass - - def on_pre_scan(self) -> None: - """ - Method called by pre_scan hook in parent class. - - Executes trigger if premove_trigger is Trus. - """ - if self.parent.premove_trigger.get() is True: - self.parent.trigger_shot.put(1) - - -class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): - """ - DG645 delay generator at cSAXS (multiple can be in use depending on the setup) - - Default values for setting up DDG. - Note: checks of set calues are not (only partially) included, check manual for details on possible settings. - https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf - - - delay_burst : (float >=0) Delay between trigger and first pulse in burst mode - - delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition - - additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line) - - polarity : (list of 0/1) polarity for different channels - - amplitude : (float) amplitude voltage of TTLs - - offset : (float) offset for ampltitude - - thres_trig_level : (float) threshold of trigger amplitude - - Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg): - - - set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan. - # TODO trigger_width and fixed_ttl could be combined into single list. - - fixed_ttl_width : (list of either 1 or 0), one for each channel. - - trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value. - - set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG. - - premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan). - - set_high_on_stage : (bool) if True, then TTL signal should go high already on stage. - """ - - custom_prepare_cls = DDGSetup - - # Custom signals passed on during the init procedure via BEC - # TODO review whether those should remain here like that - - delay_burst = Component( - bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config" - ) - - delta_width = Component( - bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config" - ) - - additional_triggers = Component( - bec_utils.ConfigSignal, - name="additional_triggers", - kind="config", - config_storage_name="ddg_config", - ) - - polarity = Component( - bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config" - ) - - fixed_ttl_width = Component( - bec_utils.ConfigSignal, - name="fixed_ttl_width", - kind="config", - config_storage_name="ddg_config", - ) - - amplitude = Component( - bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config" - ) - - offset = Component( - bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config" - ) - - thres_trig_level = Component( - bec_utils.ConfigSignal, - name="thres_trig_level", - kind="config", - config_storage_name="ddg_config", - ) - - set_high_on_exposure = Component( - bec_utils.ConfigSignal, - name="set_high_on_exposure", - kind="config", - config_storage_name="ddg_config", - ) - - set_high_on_stage = Component( - bec_utils.ConfigSignal, - name="set_high_on_stage", - kind="config", - config_storage_name="ddg_config", - ) - - set_trigger_source = Component( - bec_utils.ConfigSignal, - name="set_trigger_source", - kind="config", - config_storage_name="ddg_config", - ) - - trigger_width = Component( - bec_utils.ConfigSignal, - name="trigger_width", - kind="config", - config_storage_name="ddg_config", - ) - premove_trigger = Component( - bec_utils.ConfigSignal, - name="premove_trigger", - kind="config", - config_storage_name="ddg_config", - ) - - def __init__( - self, - name: str, - prefix: str = "", - kind: Kind = None, - ddg_config: dict = None, - parent=None, - device_manager=None, - **kwargs, - ): - """ - Args: - prefix (str, optional): Prefix of the device. Defaults to "". - name (str): Name of the device. - kind (str, optional): Kind of the device. Defaults to None. - read_attrs (list, optional): List of attributes to read. Defaults to None. - configuration_attrs (list, optional): List of attributes to configure. Defaults to None. - parent (Device, optional): Parent device. Defaults to None. - device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None. - sim_mode (bool, optional): Simulation mode flag. Defaults to False. - ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None. - - """ - - # Default values for ddg_config signals - self.ddg_config = { - # Setup default values - f"{name}_delay_burst": 0, - f"{name}_delta_width": 0, - f"{name}_additional_triggers": 0, - f"{name}_polarity": [1, 1, 1, 1, 1], - f"{name}_amplitude": 4.5, - f"{name}_offset": 0, - f"{name}_thres_trig_level": 2.5, - # Values for different behaviour during scans - f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0], - f"{name}_trigger_width": None, - f"{name}_set_high_on_exposure": False, - f"{name}_set_high_on_stage": False, - f"{name}_set_trigger_source": "SINGLE_SHOT", - f"{name}_premove_trigger": False, - } - if ddg_config is not None: - # pylint: disable=expression-not-assigned - [self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()] - super().__init__( - prefix=prefix, - name=name, - kind=kind, - parent=parent, - device_manager=device_manager, - **kwargs, - ) - - -# if __name__ == "__main__": -# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3") diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/__init__.py b/csaxs_bec/devices/epics/delay_generator_csaxs/__init__.py new file mode 100644 index 0000000..beab8e9 --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/__init__.py @@ -0,0 +1,14 @@ +from .ddg_1 import DDG1 +from .ddg_2 import DDG2 +from .delay_generator_csaxs import ( + BURSTCONFIG, + CHANNELREFERENCE, + OUTPUTPOLARITY, + STATUSBITS, + TRIGGERINHIBIT, + TRIGGERSOURCE, + AllChannelNames, + ChannelConfig, + DelayChannelNames, +) +from .error_registry import ERROR_CODES diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py new file mode 100644 index 0000000..b0bcadd --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -0,0 +1,169 @@ +""" +DDG1 delay generator + +This module implements the DDG1 delay generator logic for the CSAXS beamline. +The attached PDF trigger_scheme_ddg1_ddg2.pdf provides a more detailed overview of +the trigger scheme. If the logic changes in the future, it is highly recommended to +update the PDF accordingly. + +The DDG1 is the main trigger delay generator for the CSAXS beamline. It will +receive either a soft trigger from BEC (depending on the scan type) or a hardware trigger +from a beamline device (e.g. the Galil stages). It is responsible for opening the shutter +and sending a trigger to the Delay Generator CSAXS (DDG2), which in turn will +send the trigger to the detectors. DDG1 will not be witout burst mode, but rather in standard +mode creating delays for the channels ab, cd, ef, gh. + +A brief summary of the DDG1 logic: +DELAY PAIRS: +- DelayPair ab is connected to the EXT/EN of DDG2. +- DelayPair cd is connected to the SHUTTER. +- DelayPair ef is connected to an OR gate together with the detector + PULSE train for the MCS card. The MCS card needs one extra pulse to forward points. + +DELAY CHANNELS: +- a = t0 + 2ms (2ms delay to allow the shutter to open) +- b = a + 1us (short pulse) +- c = t0 +- d = a + exp_time * burst_count + 1ms (to allow the shutter to close) +- e = d +- f = e + 1us (short pulse to OR gate for MCS triggering) +""" + +import time + +from bec_lib.logger import bec_logger +from ophyd import DeviceStatus, StatusBase +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( + CHANNELREFERENCE, + OUTPUTPOLARITY, + STATUSBITS, + TRIGGERSOURCE, + AllChannelNames, + ChannelConfig, + DelayGeneratorCSAXS, +) + +logger = bec_logger.logger + +_DEFAULT_CHANNEL_CONFIG: ChannelConfig = { + "amplitude": 5.0, + "offset": 0.0, + "polarity": OUTPUTPOLARITY.POSITIVE, + "mode": "ttl", +} + +DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { + "t0": _DEFAULT_CHANNEL_CONFIG, + "ab": _DEFAULT_CHANNEL_CONFIG, + "cd": _DEFAULT_CHANNEL_CONFIG, + "ef": _DEFAULT_CHANNEL_CONFIG, + "gh": _DEFAULT_CHANNEL_CONFIG, +} +DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT +DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz + +DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [ + ("A", CHANNELREFERENCE.T0), # T0 + 2ms delay + ("B", CHANNELREFERENCE.A), + ("C", CHANNELREFERENCE.T0), # T0 + ("D", CHANNELREFERENCE.C), + ("E", CHANNELREFERENCE.D), # D One extra pulse once shutter closes for MCS + ("F", CHANNELREFERENCE.E), # E + 1mu s + ("G", CHANNELREFERENCE.T0), + ("H", CHANNELREFERENCE.G), +] + + +class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): + """ + Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1. + It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device (e.g. the Galil stages). + It is operated in standard mode, not burst mode and will trigger the EXT/EN of DDG2 (channel ab). + It is responsible for opening the shutter (channel cd) and sending an extra trigger to an or gate for the MCS card (channel ef). + """ + + # pylint: disable=attribute-defined-outside-init + def on_connected(self) -> None: + """ + Set the default values on the device - intended to overwrite everything to a usable default state. + Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE, + and turns off burst mode. + """ + self.burst_disable() # it is possible to miss setting settings if burst is enabled + for channel, config in DEFAULT_IO_CONFIG.items(): + self.set_io_values(channel, **config) + self.set_trigger(DEFAULT_TRIGGER_SOURCE) + self.set_references_for_channels(DEFAULT_REFERENCES) + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS. + For standard scans, it will be triggered by a soft trigger from BEC. + It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages. + + This DDG is always not in burst mode. + """ + self.burst_disable() + exp_time = self.scan_info.msg.scan_parameters["exp_time"] + frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"] + # Trigger DDG2 + # a = t0 + 2ms, b = a + 1us + # a has reference to t0, b has reference to a + self.set_delay_pairs(channel="ab", delay=2e-3, width=1e-6) + # Trigger shutter + shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3 + # d = c/t0 + 2ms + exp_time * burst_count + 1ms + # c has reference to t0, d has reference to c + self.set_delay_pairs(channel="cd", delay=0, width=shutter_width) + # Trigger extra pulse for MCS OR gate + # f = e + 1us + # e has refernce to d, f has reference to e + self.set_delay_pairs(channel="ef", delay=0, width=1e-6) + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Note, we need to add a delay to the StatusBits callback on the event_status. + If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To + avoid this, we've added the option to specify a delay via add_delay, default here is 50ms. + """ + st = StatusBase() + self.cancel_on_stop(st) + self.trigger_shot.put(1, use_complete=True) + time.sleep(self.scan_info.msg.scan_parameters["exp_time"]) + self.cancel_on_stop(st) + status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=2) + return status + + def wait_for_status( + self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2 + ) -> None: + """Wait for a event status bit to be set. + + Args: + status (StatusBase): The status object to update. + bit_event (STATUSBITS): The event status bit to wait for. + timeout (float): Maximum time to wait for the event status bit to be set. + """ + current_time = time.time() + while not status.done: + self.state.proc_status.put(1, use_complete=True) + event_status = self.state.event_status.get() + if (STATUSBITS(event_status) & bit_event) == bit_event: + status.set_finished() + if time.time() - current_time > timeout: + status.set_exception(TimeoutError(f"Timeout waiting for status {status}")) + break + time.sleep(0.1) + time.sleep(0.05) # Give time for the IOC to be ready again + return status + + def on_stop(self) -> None: + """Stop the delay generator by setting the burst mode to 0""" + self.stop_ddg() + + +if __name__ == "__main__": + ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:") + ddg.wait_for_connection(all_signals=True, timeout=30) + ddg.summary() diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py new file mode 100644 index 0000000..9b9c76b --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py @@ -0,0 +1,151 @@ +""" +DDG2 delay generator + +This module implements the DDG2 delay generator logic for the CSAXS beamline. +Please check also the code for DDG1, aswell as the attached PDF trigger_scheme_ddg1_ddg2.pdf + +The DDG2 is responsible for creating a burst of triggers for all relevant detectors. +It will receive a be triggered from the DDG1 through the EXT/EN channel. + +A brief summary of the DDG2 logic: +DELAY PAIRS: +- EXT/EN is connected to the DDG1 delay pair ab. +- DelayPair ab is connected to a multiplexer, multiplexing the trigger to the detectors. + +DELAY CHANNELS: +- a = t0 +- b = a + (exp_time - READOUT_TIMES) + +Burst mode is enabled: +- Burst count is set to the number of frames per trigger. +- Burst delay is set to 0. +- Burst period is set to the exposure time. +""" + +import time + +from bec_lib.logger import bec_logger +from ophyd import DeviceStatus, StatusBase +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( + CHANNELREFERENCE, + OUTPUTPOLARITY, + STATUSBITS, + TRIGGERSOURCE, + AllChannelNames, + ChannelConfig, + DelayGeneratorCSAXS, +) + +logger = bec_logger.logger + +_DEFAULT_CHANNEL_CONFIG: ChannelConfig = { + "amplitude": 5.0, + "offset": 0.0, + "polarity": OUTPUTPOLARITY.POSITIVE, + "mode": "ttl", +} + +DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { + "t0": _DEFAULT_CHANNEL_CONFIG, + "ab": _DEFAULT_CHANNEL_CONFIG, + "cd": _DEFAULT_CHANNEL_CONFIG, + "ef": _DEFAULT_CHANNEL_CONFIG, + "gh": _DEFAULT_CHANNEL_CONFIG, +} +DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.EXT_RISING_EDGE +DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz + +DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [ + ("A", CHANNELREFERENCE.T0), + ("B", CHANNELREFERENCE.A), + ("C", CHANNELREFERENCE.T0), + ("D", CHANNELREFERENCE.C), + ("E", CHANNELREFERENCE.T0), + ("F", CHANNELREFERENCE.E), + ("G", CHANNELREFERENCE.T0), + ("H", CHANNELREFERENCE.G), +] + + +class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): + """ + Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG2. + This device is responsible for creating triggers in burst mode and is connected to a multiplexer that + distributes the trigger to the detectors. The DDG2 is triggered by the DDG1 through the EXT/EN channel. + """ + + # pylint: disable=attribute-defined-outside-init + def on_connected(self) -> None: + """ + Set the default values on the device - intended to overwrite everything to a usable default state. + Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE. + """ + self.burst_disable() # it is possible to miss setting settings if burst is enabled + for channel, config in DEFAULT_IO_CONFIG.items(): + self.set_io_values(channel, **config) + self.set_trigger(DEFAULT_TRIGGER_SOURCE) + self.set_references_for_channels(DEFAULT_REFERENCES) + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS. + For standard scans, it will be triggered by a soft trigger from BEC. + It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages. + + This DDG is always not in burst mode. + """ + exp_time = self.scan_info.msg.scan_parameters["exp_time"] + frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"] + # a = t0 + # a has reference to t0, b has reference to a + burst_pulse_width = exp_time - DEFAULT_READOUT_TIMES["ab"] + self.set_delay_pairs(channel="ab", delay=0, width=burst_pulse_width) + self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time) + + def on_pre_scan(self): + """ + The delay generator occasionally needs a bit extra time to process all + commands from stage. Therefore, we introduce here a short sleep + """ + # Delay Generator occasionaly needs a bit extra time to process all commands, sleep 50ms + time.sleep(0.05) + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """ + DDG2 will not receive a trigger from BEC, but will be triggered by the DDG1 through the EXT/EN channel. + """ + + def wait_for_status( + self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2 + ) -> None: + """Wait for a event status bit to be set. + + Args: + status (StatusBase): The status object to update. + bit_event (STATUSBITS): The event status bit to wait for. + timeout (float): Maximum time to wait for the event status bit to be set. + """ + current_time = time.time() + while not status.done: + self.state.proc_status.put(1, use_complete=True) + event_status = self.state.event_status.get() + if (STATUSBITS(event_status) & bit_event) == bit_event: + status.set_finished() + if time.time() - current_time > timeout: + status.set_exception(TimeoutError(f"Timeout waiting for status {status}")) + break + time.sleep(0.1) + time.sleep(0.05) # Give time for the IOC to be ready again + return status + + def on_stop(self) -> None: + """Stop the delay generator by setting the burst mode to 0""" + self.stop_ddg() + + +if __name__ == "__main__": + ddg = DDG2(name="ddg2", prefix="X12SA-CPCL-DDG2:") + ddg.wait_for_connection(all_signals=True, timeout=30) + ddg.summary() diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py new file mode 100644 index 0000000..981f09c --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py @@ -0,0 +1,717 @@ +""" +Delay generator implementation for CSAXS. + +Detailed information can be found in the manual: +https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf +""" + +import enum +from typing import Literal, TypedDict +import time + +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal +from ophyd_devices import StatusBase, SubscriptionStatus +from typeguard import typechecked + +from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES + +logger = bec_logger.logger +DelayChannelNames = Literal["ab", "cd", "ef", "gh"] +AllChannelNames = Literal["t0", "ab", "cd", "ef", "gh"] +LiteralChannels = Literal["A", "B", "C", "D", "E", "F", "G", "H"] + + +class CHANNELREFERENCE(enum.Enum): + T0 = 0 + A = 1 + B = 2 + C = 3 + D = 4 + E = 5 + F = 6 + G = 7 + H = 8 + + +class BURSTCONFIG(enum.Enum): + """Enum option for burst_config signal of the delay generator. + + ALL_CYCLES: T0 triggere for all cycles. + FIRST_CYCLE: T0 only triggered for the first cycle. + """ + + ALL_CYCLES = 0 + FIRST_CYCLE = 1 + + +class TRIGGERSOURCE(enum.Enum): + """Enum options for the trigger_source signal of the delay generator.""" + + INTERNAL = 0 + EXT_RISING_EDGE = 1 + EXT_FALLING_EDGE = 2 + SS_EXT_RISING_EDGE = 3 + SS_EXT_FALLING_EDGE = 4 + SINGLE_SHOT = 5 + LINE = 6 + + +class TRIGGERINHIBIT(enum.Enum): + """Enum options for the trigger_inhibit signal of the delay generator.""" + + OFF = 0 + TRIGGERS = 1 + AB = 2 + AB_CD = 3 + AB_CD_EF = 4 + AB_CD_EF_GH = 5 + + +class OUTPUTPOLARITY(enum.Enum): + """Enum options for the polarity signal of the static pair.""" + + NEGATIVE = 0 + POSITIVE = 1 + + +class STATUSBITS(enum.IntFlag): + """Bit flags for the status signal of the delay generator.""" + + TRIG = 1 << 0 # Got a trigger. + RATE = 1 << 1 # Got a trigger while a delay or burst was in progress. + END_OF_DELAY = 1 << 2 # A delay cycle has completed. + END_OF_BURST = 1 << 3 # A burst cycle has completed. + INHIBIT = 1 << 4 # A trigger or output delay cycle was inhibited. + ABORT_DELAY = 1 << 5 # A delay cycle was aborted early. + PLL_UNLOCK = 1 << 6 # The 100 MHz PLL came unlocked. + RB_UNLOCK = 1 << 7 # The installed Rb oscillator is unlocked. + + def describe(self) -> dict: + """Return a description of the status bits.""" + descriptions = { + STATUSBITS.TRIG: "Got a trigger.", + STATUSBITS.RATE: "Got a trigger while a delay or burst was in progress.", + STATUSBITS.END_OF_DELAY: "A delay cycle has completed.", + STATUSBITS.END_OF_BURST: "A burst cycle has completed.", + STATUSBITS.INHIBIT: "A trigger or output delay cycle was inhibited.", + STATUSBITS.ABORT_DELAY: "A delay cycle was aborted early.", + STATUSBITS.PLL_UNLOCK: "The 100 MHz PLL came unlocked.", + STATUSBITS.RB_UNLOCK: "The installed Rb oscillator is unlocked.", + } + return {flag.name: descriptions[flag] for flag in STATUSBITS if flag in self} + + +class StatusBitsCompareStatus(SubscriptionStatus): + """Compare status for STATUSBITS comparison.""" + + def __init__( + self, + signal: EpicsSignalRO, + value: STATUSBITS, + raise_states: list[STATUSBITS] | None = None, + *args, + event_type=None, + timeout: float | None = None, + add_delay:float|None = None, + settle_time: float = 0, + run: bool = True, + **kwargs, + ): + """Initialize the compare status with a signal.""" + self._signal = signal + self._value = value + self._add_delay = add_delay or 0 + self._raise_states = raise_states or [] + super().__init__( + device=signal, + callback=self._compare_callback, + timeout=timeout, + settle_time=settle_time, + event_type=event_type, + run=run, + ) + + def _compare_callback(self, value, **kwargs) -> bool: + """Callback for subscription status""" + obj = kwargs.get("obj", None) + if obj is None: + name = 'no object received' + else: + name=obj.name + if any((STATUSBITS(value) & state) == state for state in self._raise_states): + self.set_exception( + ValueError( + f"Status bits {STATUSBITS(value).describe()} raised an exception: {self._raise_states}" + ) + ) + return False + if self._add_delay !=0: + time.sleep(self._add_delay) + + return (STATUSBITS(value) & self._value) == self._value + + +class ChannelConfig(TypedDict): + amplitude: float | None + offset: float | None + polarity: OUTPUTPOLARITY | Literal[0, 1] | None + mode: Literal["ttl", "nim"] | None + + +class StaticPair(Device): + """ + Class to represent a static pair (T0, aswell as all AB, CB, EF, GH channels). + It allows setting the logic levels, but the timing is fixed. + The signal is high after receiving the trigger until the end of the holdoff period. + """ + + ttl_mode = Cpt( + EpicsSignal, + "OutputModeTtlSS.PROC", + kind=Kind.omitted, + auto_monitor=True, + doc="Set the output mode to TTL", + ) + nim_mode = Cpt( + EpicsSignal, + "OutputModeNimSS.PROC", + kind=Kind.omitted, + auto_monitor=True, + doc="Set the output mode to NIM", + ) + polarity = Cpt( + EpicsSignal, + "OutputPolarityBI", + write_pv="OutputPolarityBO", + name="polarity", + kind=Kind.omitted, + auto_monitor=True, + doc="Control the polarity of the output signal. POS 1 or NEG 0", + ) + + amplitude = Cpt( + EpicsSignal, + "OutputAmpAI", + write_pv="OutputAmpAO", + name="amplitude", + kind=Kind.omitted, + auto_monitor=True, + doc="Amplitude of the output signal in volts.", + ) + + offset = Cpt( + EpicsSignal, + "OutputOffsetAI", + write_pv="OutputOffsetAO", + name="offset", + kind=Kind.omitted, + auto_monitor=True, + doc="Offset of the output signal in volts.", + ) + + +class Channel(Device): + """ + Represents a single channel A, B, C, ... of the delay generator. + """ + + setpoint = Cpt( + EpicsSignal, + write_pv="DelayAO", + read_pv="DelayAI", + put_complete=True, + auto_monitor=True, + kind=Kind.omitted, + doc="Setpoint value for the delay of the channel", + ) + reference = Cpt( + EpicsSignal, + "ReferenceMO", + put_complete=True, + kind=Kind.omitted, + auto_monitor=True, + doc="Reference channel T0,A,B,.. for the delay of the setpoint", + ) + + def __init__(self, *args, **kwargs): + """ + Initialize the channel with a setpoint and reference signal. + """ + # The read PV in EpicsSignal does not receive the prefix.. so we need to add it manually. + self.__class__.__dict__["setpoint"].kwargs["read_pv"] = args[0] + "DelayAI" + super().__init__(*args, **kwargs) + + +class WidthSignal(Signal): + """A signal that represents the width of a channel.""" + + def get(self, **kwargs) -> float: + """ + Get the width of the channel. + + Returns: + float: The width of the channel in seconds. + """ + parent: _DelayPairBase = self._parent # type: ignore + return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore + + def check_value(self, value: float) -> float: + """Check if the value is larger equal to 0""" + if value >= 0: + return value + else: + raise ValueError(f"Width must be larger ot equal 0, got {value} seconds.") + + def put(self, value: float, **kwargs): + """ + Set the width of the channel. + + Args: + value (float): The width to set in seconds. + """ + self.check_value(value) + parent: _DelayPairBase = self._parent # type: ignore + ch1_setpoint: float = parent.ch1.setpoint.get() # type: ignore + parent.ch2.setpoint.put(ch1_setpoint + value, **kwargs) + + def set(self, value: float, **kwargs): + """ + Set the width of the channel. + + Args: + value (float): The width to set in seconds. + """ + status = StatusBase() + self.put(value, **kwargs) + status.set_finished() + return status + + +class DelaySignal(Signal): + """A signal that represents the delay of a channel.""" + + def get(self, **kwargs): + """ + Get the delay of the channel. + + Returns: + float: The delay of the channel in seconds. + """ + parent: _DelayPairBase = self._parent # type: ignore + return parent.ch1.setpoint.get() + + def put(self, value: float, **kwargs): + """ + Set the delay of the channel. + + Args: + value (float): The delay to set in seconds. + """ + parent: _DelayPairBase = self._parent # type: ignore + parent.ch1.setpoint.put(value, **kwargs) + parent.ch2.setpoint.put(value + parent.width.get(), **kwargs) + + def set(self, value: float, **kwargs): + """ + Set the width of the channel. + + Args: + value (float): The width to set in seconds. + """ + status = StatusBase() + self.put(value, **kwargs) + status.set_finished() + return status + + +class _DelayPairBase(Device): + """Base class for delay pairs. Children have to implement ch1,ch2 for + the respective delay channels. The class attributes have to be called + ch1, ch2 for width and delay signals to work.""" + + ch1: Cpt[Channel] + ch2: Cpt[Channel] + io: Cpt[StaticPair] + width = Cpt( + WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse for delay pair" + ) + delay = Cpt( + DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse for delay pair" + ) + + +class DelayPairAB(_DelayPairBase): + + ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted, doc="Channel A") + ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted, doc="Channel B") + io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted, doc="IO for delay pair AB") + + +class DelayPairCD(_DelayPairBase): + + ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted, doc="Channel C") + ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted, doc="Channel D") + io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted, doc="IO for delay pair CD") + + +class DelayPairEF(_DelayPairBase): + + ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted, doc="Channel E") + ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted, doc="Channel F") + io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted, doc="IO for delay pair EF") + + +class DelayPairGH(_DelayPairBase): + + ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted, doc="Channel G") + ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted, doc="Channel H") + io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted, doc="IO for delay pair GH") + + +class DelayGeneratorEventStatus(Device): + """Subdevice to represent the event state of the delay generator.""" + + event_status = Cpt( + EpicsSignalRO, + "EventStatusLI", + name="event_status", + kind=Kind.omitted, + auto_monitor=True, + doc="Event status register for the delay generator", + ) + proc_status = Cpt( + EpicsSignal, + "EventStatusLI.PROC", + name="proc_status", + kind=Kind.omitted, + doc="Poll and flush the latest event status register entry from the HW to the event_status signal", + ) + + +class DelayGeneratorCSAXS(Device): + """ + Delay Generator Stanford Research DG645. This implements an interface for the DG645 delay generator. + + Detailed information can be found in the manual: + https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf + + The DG645 has 8 channels, each with a delay and pulse width. The channels are implemented as DelayPair objects (AB etc.). + + Each pair has a TTL pulse width, delay and a reference signal to which they are being triggered. + In addition, the io layer allows setting amplitude, offset and polarity for each pair. + """ + + _pv_timeout: float = 1.5 # Default timeout for PV operations in seconds + + # Front Panel + t0 = Cpt(StaticPair, "T0", name="t0", doc="T0 static pair") + ab = Cpt(DelayPairAB, "", name="ab", doc="Delay pair AB") + cd = Cpt(DelayPairCD, "", name="cd", doc="Delay pair CD") + ef = Cpt(DelayPairEF, "", name="ef", doc="Delay pair EF") + gh = Cpt(DelayPairGH, "", name="gh", doc="Delay pair GH") + state = Cpt(DelayGeneratorEventStatus, "", name="state", doc="Subdevice for event status") + status_msg = Cpt( + EpicsSignalRO, + "StatusSI", + name="status_msg", + kind=Kind.omitted, + auto_monitor=True, + doc="Status message from the delay generator", + ) + status_msg_clear = Cpt( + EpicsSignal, + "StatusClearBO", + name="status_msg_clear", + kind=Kind.omitted, + doc="Clear the status message", + ) + + trigger_holdoff = Cpt( + EpicsSignal, + "TriggerHoldoffAI", + write_pv="TriggerHoldoffAO", + name="trigger_holdoff", + kind=Kind.config, + ) + trigger_inhibit = Cpt( + EpicsSignal, + "TriggerInhibitMI", + write_pv="TriggerInhibitMO", + name="trigger_inhibit", + kind=Kind.omitted, + ) + trigger_source = Cpt( + EpicsSignal, + "TriggerSourceMI", + write_pv="TriggerSourceMO", + name="trigger_source", + kind=Kind.omitted, + doc="Trigger Source for the DDG, options in TRIGGERSOURCE", + ) + trigger_level = Cpt( + EpicsSignal, + "TriggerLevelAI", + write_pv="TriggerLevelAO", + name="trigger_level", + kind=Kind.omitted, + ) + trigger_rate = Cpt( + EpicsSignal, + "TriggerRateAI", + write_pv="TriggerRateAO", + name="trigger_rate", + kind=Kind.omitted, + ) + trigger_shot = Cpt( + EpicsSignal, + "TriggerDelayBO", + name="trigger_shot", + kind=Kind.omitted, + doc="Software trigger, needs to be in correct mode to work", + ) + burst_mode = Cpt( + EpicsSignal, + "BurstModeBI", + write_pv="BurstModeBO", + name="burst_mode", + kind=Kind.omitted, + auto_monitor=True, + doc="Enable or disable burst mode. 1 = enabled, 0 = disabled.", + ) + burst_config = Cpt( + EpicsSignal, + "BurstConfigBI", + write_pv="BurstConfigBO", + name="burst_config", + kind=Kind.omitted, + doc="Configuration of T0 during burst. Can be ALL_CYCLES (0) or FIRST_CYCLE (1) .", + ) + burst_count = Cpt( + EpicsSignal, + "BurstCountLI", + write_pv="BurstCountLO", + name="burst_count", + kind=Kind.omitted, + doc="Number of bursts to trigger in burst mode. Must be >0.", + ) + burst_delay = Cpt( + EpicsSignal, + "BurstDelayAI", + write_pv="BurstDelayAO", + name="burst_delay", + kind=Kind.omitted, + doc="Delay before bursts start in seconds. Must be >=0.", + ) + burst_period = Cpt( + EpicsSignal, + "BurstPeriodAI", + write_pv="BurstPeriodAO", + name="burst_period", + kind=Kind.omitted, + doc="Period of the bursts in seconds. Must be >0.", + ) + + def proc_event_status(self) -> None: + """The reading must be manually triggered to update the event status.""" + self.state.proc_status.put(1) + + def wait_for_event_status( + self, value: STATUSBITS, timeout: float | None = None + ) -> StatusBitsCompareStatus: + """ + Wait for a specific event status. + + Args: + value (STATUSBITS): The status bits to wait for. + timeout (float): The maximum time to wait in seconds. + """ + return StatusBitsCompareStatus( + signal=self.state.event_status, value=value, timeout=timeout, run=True + ) + + def set_trigger(self, source: TRIGGERSOURCE | int) -> None: + """ + Set the trigger source. + + Args: + source (TriggerSource | int): The trigger source + INTERNAL = 0 + EXT_RISING_EDGE = 1 + EXT_FALLING_EDGE = 2 + SS_EXT_RISING_EDGE = 3 + SS_EXT_FALLING_EDGE = 4 + SINGLE_SHOT = 5 + LINE = 6 + """ + if isinstance(source, TRIGGERSOURCE): + self.trigger_source.set(source.value).wait(self._pv_timeout) + else: + self.trigger_source.set(int(source)).wait(self._pv_timeout) + + @typechecked + def burst_enable( + self, + count: int, + delay: float, + period: float, + config: Literal["all", "first"] | BURSTCONFIG = "first", + ) -> None: + """Enable burst mode with valid parameters. + + Args: + count (int): Number of bursts >0 + delay (float): Delay before bursts start in seconds >=0 + period (float): Period of the bursts in seconds >0 + config (str): Configuration of T0 duiring burst. + In addition, to simplify triggering of other instruments synchronously with the burst, + the T0 output may be configured to fire on the first delay cycle of the burst, + rather than for all delay cycles as is normally the case. BURSTCONFIG + """ + + # Check inputs first + if count <= 0: + raise ValueError(f"Count must be >0, provided: {count}") + if delay < 0: + raise ValueError(f"Delay must be >=0, provided: {delay}") + if period <= 0: + raise ValueError(f"Period must be >0, provided: {period}") + + self.burst_mode.set(1).wait(timeout=self._pv_timeout) + self.burst_count.set(count).wait(timeout=self._pv_timeout) + self.burst_delay.set(delay).wait(timeout=self._pv_timeout) + self.burst_period.set(period).wait(timeout=self._pv_timeout) + + if config == "all": + self.burst_config.set(BURSTCONFIG.ALL_CYCLES.value).wait(timeout=self._pv_timeout) + elif config == "first": + self.burst_config.set(BURSTCONFIG.FIRST_CYCLE.value).wait(timeout=self._pv_timeout) + + def burst_disable(self) -> None: + """Disable burst mode""" + self.burst_mode.set(0).wait(timeout=self._pv_timeout) + + @typechecked + def set_io_values( + self, + channel: AllChannelNames | list[AllChannelNames], + amplitude: float | None = None, + offset: float | None = None, + polarity: OUTPUTPOLARITY | Literal[0, 1] | None = None, + mode: Literal["ttl", "nim"] | None = None, + ) -> None: + """Set the IO values for the static pair. + + Args: + channel (str | list[str]): Channel(s) to set the IO values for. + Can be "t0", "ab", "cd", "ef", "gh" or a list of these. + If a list is provided, the same values will be set for all channels. + amplitude (float): Amplitude of the output signal in volts. + offset (float): Offset of the output signal in volts. + polarity (OUTPUTPOLARITY | int): Polarity of the output signal. + ttl_mode (bool): If True, set the output to TTL mode. + nim_mode (bool): If True, set the output to NIM mode. + If both ttl_mode and nim_mode are set to True, + a ValueError is raised. + """ + if isinstance(channel, str): + channel = [channel] + for ch in channel: + if ch == "t0": + io_channel = self.t0 + else: + io_channel = getattr(getattr(self, ch), "io") + if amplitude is not None: + io_channel.amplitude.set(amplitude).wait(timeout=self._pv_timeout) + if offset is not None: + io_channel.offset.set(offset).wait(timeout=self._pv_timeout) + if polarity is not None: + if isinstance(polarity, OUTPUTPOLARITY): + io_channel.polarity.set(polarity.value).wait(timeout=self._pv_timeout) + else: + io_channel.polarity.set(int(polarity)).wait(timeout=self._pv_timeout) + if mode == "ttl": + io_channel.ttl_mode.set(1).wait(timeout=self._pv_timeout) + if mode == "nim": + io_channel.nim_mode.set(1).wait(timeout=self._pv_timeout) + + def set_delay_pairs( + self, + channel: DelayChannelNames | list[DelayChannelNames], + delay: float | list[float] | None = None, + width: float | list[float] | None = None, + ) -> None: + """Set the delay and width for a specific channel pair. + + Args: + channel (str): Channel pair to set the delay and width for. + Can be "ab", "cd", "ef", "gh". + delay (float): Delay in seconds to set for the channel pair. + width (float): Width in seconds to set for the channel pair. + """ + if isinstance(channel, str): + channel = [channel] + if isinstance(delay, (float, int)): + delay = [float(delay)] * len(channel) + if isinstance(width, (float, int)): + width = [float(width)] * len(channel) + if delay is not None: + if len(delay) != len(channel): + raise ValueError( + f"Length of delay {len(delay)} must match length of channel {len(channel)}." + ) + for ii, ch in enumerate(channel): + delay_channel = getattr(self, ch) + delay_channel.delay.put(delay[ii]) + if width is not None: + if len(width) != len(channel): + raise ValueError( + f"Length of width {len(width)} must match length of channel {len(channel)}." + ) + for ii, ch in enumerate(channel): + delay_channel = getattr(self, ch) + delay_channel.width.put(width[ii]) + + def _get_literal_channel(self, channel: LiteralChannels) -> Channel: + return { + "A": self.ab.ch1, + "B": self.ab.ch2, + "C": self.cd.ch1, + "D": self.cd.ch2, + "E": self.ef.ch1, + "F": self.ef.ch2, + "G": self.gh.ch1, + "H": self.gh.ch2, + }[channel] + + def set_channel_reference(self, channel: LiteralChannels, reference_channel: CHANNELREFERENCE): + self._get_literal_channel(channel).reference.put(reference_channel.value) + + def set_references_for_channels( + self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]] + ): + for ch, ref in channels_and_refs: + self.set_channel_reference(ch, ref) + + def stop_ddg(self) -> None: + """Stop the delay generator by setting the burst mode to 0""" + self.burst_mode.put(0) + + def reset_error(self) -> None: + """Reset the error status message of the delay generator.""" + self.status_msg_clear.put(1) + + def get_error_msg(self) -> str: + """Get the error message from the delay generator.""" + msg = self.status_msg.get() + if msg in ERROR_CODES: + return ERROR_CODES[msg] + else: + return f"Unknown error code: {msg}" + + +if __name__ == "__main__": + ddg = DelayGeneratorCSAXS(name="ddg", prefix="X12SA-CPCL-DDG1:") + ddg.wait_for_connection(all_signals=True, timeout=30) + ddg.summary() diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/error_registry.py b/csaxs_bec/devices/epics/delay_generator_csaxs/error_registry.py new file mode 100644 index 0000000..93d3874 --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/error_registry.py @@ -0,0 +1,73 @@ +ERROR_CODES: dict[str, str] = { + "STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for 'No Error' to 'STATUS OK' + "Illegal Value": "A parameter was out of range.", + "Illegal Mode": "The action is illegal in the current mode.", + "Illegal Delay": "The requested delay is out of range.", + "Illegal Link": "The requested delay linkage is illegal.", + "Recall Failed": "Recall of instrument settings failed; settings were invalid.", + "Not Allowed": "Action not allowed: instrument is locked by another interface.", + "Failed Self Test": "The DG645 self test failed.", + "Failed Auto Calibration": "The DG645 auto calibration failed.", + "Lost Data": "Output buffer overflow or data lost due to communication error.", + "No Listener": "No GPIB listeners; pending output discarded.", + "Failed ROM Check": "ROM checksum failed; firmware likely corrupted.", + "Failed Offset T0 Test": "Self test of offset functionality for T0 failed.", + "Failed Offset AB Test": "Self test of offset functionality for AB failed.", + "Failed Offset CD Test": "Self test of offset functionality for CD failed.", + "Failed Offset EF Test": "Self test of offset functionality for EF failed.", + "Failed Offset GH Test": "Self test of offset functionality for GH failed.", + "Failed Amplitude T0 Test": "Self test of amplitude functionality for T0 failed.", + "Failed Amplitude AB Test": "Self test of amplitude functionality for AB failed.", + "Failed Amplitude CD Test": "Self test of amplitude functionality for CD failed.", + "Failed Amplitude EF Test": "Self test of amplitude functionality for EF failed.", + "Failed Amplitude GH Test": "Self test of amplitude functionality for GH failed.", + "Failed FPGA Communications Test": "Self test of FPGA communications failed.", + "Failed GPIB Communications Test": "Self test of GPIB communications failed.", + "Failed DDS Communications Test": "Self test of DDS communications failed.", + "Failed Serial EEPROM Communications Test": "Self test of serial EEPROM failed.", + "Failed Temperature Sensor Communications Test": "Temp sensor communication failed.", + "Failed PLL Communications Test": "PLL communication self test failed.", + "Failed DAC 0 Communications Test": "Self test of DAC 0 failed.", + "Failed DAC 1 Communications Test": "Self test of DAC 1 failed.", + "Failed DAC 2 Communications Test": "Self test of DAC 2 failed.", + "Failed Sample and Hold Operations Test": "Sample and hold self test failed.", + "Failed Vjitter Operations Test": "Vjitter operation self test failed.", + "Failed Channel T0 Analog Delay Test": "Analog delay test for T0 failed.", + "Failed Channel T1 Analog Delay Test": "Analog delay test for T1 failed.", + "Failed Channel A Analog Delay Test": "Analog delay test for A failed.", + "Failed Channel B Analog Delay Test": "Analog delay test for B failed.", + "Failed Channel C Analog Delay Test": "Analog delay test for C failed.", + "Failed Channel D Analog Delay Test": "Analog delay test for D failed.", + "Failed Channel E Analog Delay Test": "Analog delay test for E failed.", + "Failed Channel F Analog Delay Test": "Analog delay test for F failed.", + "Failed Channel G Analog Delay Test": "Analog delay test for G failed.", + "Failed Channel H Analog Delay Test": "Analog delay test for H failed.", + "Failed Sample and Hold Calibration": "Auto calibration of sample and hold failed.", + "Failed T0 Calibration": "Auto calibration of channel T0 failed.", + "Failed T1 Calibration": "Auto calibration of channel T1 failed.", + "Failed A Calibration": "Auto calibration of channel A failed.", + "Failed B Calibration": "Auto calibration of channel B failed.", + "Failed C Calibration": "Auto calibration of channel C failed.", + "Failed D Calibration": "Auto calibration of channel D failed.", + "Failed E Calibration": "Auto calibration of channel E failed.", + "Failed F Calibration": "Auto calibration of channel F failed.", + "Failed G Calibration": "Auto calibration of channel G failed.", + "Failed H Calibration": "Auto calibration of channel H failed.", + "Failed Vjitter Calibration": "Auto calibration of Vjitter failed.", + "Illegal Command": "The command syntax used was illegal.", + "Undefined Command": "The specified command does not exist.", + "Illegal Query": "The specified command does not permit queries.", + "Illegal Set": "The specified command can only be queried.", + "Null Parameter": "The parser detected an empty parameter.", + "Extra Parameters": "Too many parameters were provided.", + "Missing Parameters": "Some required parameters are missing.", + "Parameter Overflow": "Buffer overflow while parsing parameters.", + "Invalid Floating Point Number": "Expected a float but couldn't parse it.", + "Invalid Integer": "Expected an integer but couldn't parse it.", + "Integer Overflow": "Parsed integer is too large.", + "Invalid Hexadecimal": "Failed to parse expected hexadecimal input.", + "Syntax Error": "The parser detected a syntax error.", + "Communication Error": "Framing or parity error detected.", + "Over run": "Input buffer overflowed.", + "Too Many Errors": "Error buffer is full; some errors dropped.", +} diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/trigger_scheme_ddg1_ddg2.pdf b/csaxs_bec/devices/epics/delay_generator_csaxs/trigger_scheme_ddg1_ddg2.pdf new file mode 100644 index 0000000..02c2784 Binary files /dev/null and b/csaxs_bec/devices/epics/delay_generator_csaxs/trigger_scheme_ddg1_ddg2.pdf differ diff --git a/tests/tests_devices/test_delay_generator_csaxs.py b/tests/tests_devices/test_delay_generator_csaxs.py index 10005f1..0d6e5b4 100644 --- a/tests/tests_devices/test_delay_generator_csaxs.py +++ b/tests/tests_devices/test_delay_generator_csaxs.py @@ -1,276 +1,289 @@ # pylint: skip-file +import threading +from typing import Generator from unittest import mock +import numpy as np +import ophyd import pytest -from ophyd_devices.devices.delay_generator_645 import TriggerSource +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs -from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup +from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2 +from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( + BURSTCONFIG, + CHANNELREFERENCE, + STATUSBITS, + TRIGGERSOURCE, + DelayGeneratorCSAXS, +) @pytest.fixture(scope="function") -def mock_DDGSetup(): - mock_ddg = mock.MagicMock() - yield DDGSetup(parent=mock_ddg) +def mock_ddg1() -> Generator[DDG1, DDG1, DDG1]: + """Fixture to mock the DDG1 device.""" + name = "ddg1" + prefix = "test_ddg1:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = DDG1(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev -# Fixture for scaninfo -@pytest.fixture( - params=[ - { - "scan_id": "1234", - "scan_type": "step", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 0.1, - "readout_time": 0.1, - }, - { - "scan_id": "1234", - "scan_type": "step", - "num_points": 500, - "frames_per_trigger": 5, - "exp_time": 0.01, - "readout_time": 0, - }, - { - "scan_id": "1234", - "scan_type": "fly", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 1, - "readout_time": 0.2, - }, - { - "scan_id": "1234", - "scan_type": "fly", - "num_points": 500, - "frames_per_trigger": 5, - "exp_time": 0.1, - "readout_time": 0.4, - }, - ] -) -def scaninfo(request): - return request.param +@pytest.fixture(scope="function") +def mock_ddg2() -> Generator[DDG2, DDG2, DDG2]: + """Fixture to mock the DDG1 device.""" + name = "ddg2" + prefix = "test_ddg2:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = DDG2(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev -# Fixture for DDG config default values -@pytest.fixture( - params=[ - { - "delay_burst": 0.0, - "delta_width": 0.0, - "additional_triggers": 0, - "polarity": [0, 0, 0, 0, 0], - "amplitude": 0.0, - "offset": 0.0, - "thres_trig_level": 0.0, - }, - { - "delay_burst": 0.1, - "delta_width": 0.1, - "additional_triggers": 1, - "polarity": [0, 0, 1, 0, 0], - "amplitude": 5, - "offset": 0.0, - "thres_trig_level": 2.5, - }, - ] -) -def ddg_config_defaults(request): - return request.param +@pytest.fixture(scope="function") +def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]: + """Fixture to mock the camera device.""" + name = "ddg" + prefix = "test:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = DelayGeneratorCSAXS(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev -# Fixture for DDG config scan values -@pytest.fixture( - params=[ - { - "fixed_ttl_width": [0, 0, 0, 0, 0], - "trigger_width": None, - "set_high_on_exposure": False, - "set_high_on_stage": False, - "set_trigger_source": "SINGLE_SHOT", - "premove_trigger": False, - }, - { - "fixed_ttl_width": [0, 0, 0, 0, 0], - "trigger_width": 0.1, - "set_high_on_exposure": True, - "set_high_on_stage": False, - "set_trigger_source": "SINGLE_SHOT", - "premove_trigger": True, - }, - { - "fixed_ttl_width": [0, 0, 0, 0, 0], - "trigger_width": 0.1, - "set_high_on_exposure": False, - "set_high_on_stage": False, - "set_trigger_source": "EXT_RISING_EDGE", - "premove_trigger": False, - }, - ] -) -def ddg_config_scan(request): - return request.param +def test_ddg_init(mock_ddg): + """Test the proc event status method.""" + assert mock_ddg.name == "ddg" + assert mock_ddg.prefix == "test:" -# Fixture for delay pairs -@pytest.fixture( - params=[ - {"all_channels": ["channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]}, - {"all_channels": [], "all_delay_pairs": []}, - {"all_channels": ["channelT0", "channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]}, - ] -) -def channel_pairs(request): - return request.param +def test_ddg_proc_event_status(mock_ddg): + """Test the proc event status method.""" + mock_ddg.state.proc_status.put(0) + mock_ddg.proc_event_status() + assert mock_ddg.state.proc_status.get() == 1 -def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): - """Test the check_scan_id method.""" - # Set first attributes of parent class - for k, v in scaninfo.items(): - setattr(mock_DDGSetup.parent.scaninfo, k, v) - for k, v in ddg_config_defaults.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - for k, v in ddg_config_scan.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - # Call the function you want to test - mock_DDGSetup.on_pre_scan() - if ddg_config_scan["premove_trigger"]: - mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1) +def test_ddg_set_trigger(mock_ddg): + """Test setting the trigger.""" + for trigger in TRIGGERSOURCE: + mock_ddg.set_trigger(trigger) + assert mock_ddg.trigger_source.get() == trigger.value -# TODO put back once the logic is implemented -# @pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"]) -# def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source): -# """Test the on_trigger method.""" -# # Set first attributes of parent class -# for k, v in scaninfo.items(): -# setattr(mock_DDGSetup.parent.scaninfo, k, v) -# for k, v in ddg_config_defaults.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# for k, v in ddg_config_scan.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# # Call the function you want to test -# mock_DDGSetup.parent.source.name = "source" -# mock_DDGSetup.parent.source.read.return_value = { -# mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)} -# } -# mock_DDGSetup.on_trigger() -# if source == "SINGLE_SHOT": -# mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1) +def test_ddg_burst_enable(mock_ddg): + """Test enabling burst mode.""" + mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) + mock_ddg.burst_mode.get() == 1 + assert mock_ddg.burst_count.get() == 100 + assert mock_ddg.burst_delay.get() == 0.1 + assert mock_ddg.burst_period.get() == 0.02 + assert mock_ddg.burst_config.get() == BURSTCONFIG.ALL_CYCLES.value + assert mock_ddg.burst_mode.get() == 1 + # Count is 0 + with pytest.raises(ValueError): + mock_ddg.burst_enable(count=0, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) + # delay is negative + with pytest.raises(ValueError): + mock_ddg.burst_enable(count=100, delay=-0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) + # period is zero + with pytest.raises(ValueError): + mock_ddg.burst_enable(count=100, delay=0.1, period=0, config=BURSTCONFIG.ALL_CYCLES) + + # Works with default config + mock_ddg.burst_enable(count=100, delay=0.1, period=0.02) + mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value -def test_on_wait_for_connection( - mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs -): - """Test the initialize_default_parameter method.""" - # Set first attributes of parent class - for k, v in scaninfo.items(): - setattr(mock_DDGSetup.parent.scaninfo, k, v) - for k, v in ddg_config_defaults.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - for k, v in ddg_config_scan.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - # Call the function you want to test - mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] - mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] - calls = [] - calls.extend( - [ - mock.call("polarity", ddg_config_defaults["polarity"][ii], [channel]) - for ii, channel in enumerate(channel_pairs["all_channels"]) - ] +def test_ddg_wait_for_event_status(mock_ddg): + """Test setting wait for event status.""" + mock_ddg: DelayGeneratorCSAXS + mock_ddg.state.event_status._read_pv.mock_data = 0 + status = mock_ddg.wait_for_event_status(value=STATUSBITS.END_OF_BURST) # 8 + assert status.done is False + mock_ddg.state.event_status._read_pv.mock_data = 1 + assert status.done is False + mock_ddg.state.event_status._read_pv.mock_data = 4 + assert status.done is False + # TODO enable once callback for MockPV is implemented + # mock_ddg.state.event_status._read_pv.mock_data = 13 # 8 + 4 + 1 + # status.wait(timeout=1) # Wait for the status to be done + # assert status.done is True + + +def test_ddg_set_io_values(mock_ddg): + """Test setting IO values.""" + mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl") + assert mock_ddg.ab.io.amplitude.get() == 3 + assert mock_ddg.ab.io.offset.get() == 2 + assert mock_ddg.ab.io.polarity.get() == 1 + assert mock_ddg.ab.io.ttl_mode.get() == 1 + # List of channels + channels = ["ab", "cd", "t0"] + mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim") + for channel in channels: + if channel == "t0": + attr = getattr(mock_ddg, channel) + else: + attr = getattr(mock_ddg, channel).io + assert attr.amplitude.get() == 3 + assert attr.offset.get() == 2 + assert attr.polarity.get() == 1 + assert attr.nim_mode.get() == 1 + + +def test_ddg_set_delay_pairs(mock_ddg): + """Test setting delay pairs.""" + mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2) + assert np.isclose(mock_ddg.ab.delay.get(), 0.1) + assert np.isclose(mock_ddg.ab.width.get(), 0.2) + assert np.isclose(mock_ddg.ab.ch1.setpoint.get(), 0.1) + assert np.isclose(mock_ddg.ab.ch2.setpoint.get(), 0.3) + # List of channels + channels = ["ab", "cd", "ef", "gh"] + delays = [0.1, 0.2, 0.4, 0.5] + mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2) + for delay, channel in zip(delays, channels): + assert np.isclose(getattr(mock_ddg, channel).delay.get(), delay) + assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2) + assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay) + assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2) + + +def test_ddg1_on_connected(mock_ddg1): + """Test the on_connected method of DDG1.""" + mock_ddg1.on_connected() + # IO defaults + assert mock_ddg1.burst_mode.get() == 0 + assert mock_ddg1.ab.io.amplitude.get() == 5.0 + assert mock_ddg1.cd.io.offset.get() == 0.0 + assert mock_ddg1.ef.io.polarity.get() == 1 + assert mock_ddg1.gh.io.ttl_mode.get() == 1 + + # reference defaults + assert mock_ddg1.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value + assert mock_ddg1.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value + assert mock_ddg1.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value + assert mock_ddg1.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value + assert mock_ddg1.ef.ch1.reference.get() == 4 # CHANNELREFERENCE.D.value + assert mock_ddg1.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value + assert mock_ddg1.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value + assert mock_ddg1.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value + + # Default trigger source + assert mock_ddg1.trigger_source.get() == 5 # TRIGGERSOURCE.SINGLE_SHOT.value + + +def test_ddg1_stage(mock_ddg1): + """Test the on_stage method of DDG1.""" + exp_time = 0.1 + frames_per_trigger = 10 + + mock_ddg1.burst_mode.put(1) + mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time + mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + + mock_ddg1.stage() + + assert np.isclose(mock_ddg1.burst_mode.get(), 0) # Burst mode is disabled + # Trigger DDG2 through EXT/EN + + assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3) + assert np.isclose(mock_ddg1.ab.width.get(), 1e-6) + # Shutter channel cd + assert np.isclose(mock_ddg1.cd.delay.get(), 0) + assert np.isclose(mock_ddg1.cd.width.get(), 2e-3 + exp_time * frames_per_trigger + 1e-3) + # MCS channel ef or gate + assert np.isclose(mock_ddg1.ef.delay.get(), 0) + assert np.isclose(mock_ddg1.ef.width.get(), 1e-6) + + assert mock_ddg1.staged == ophyd.Staged.yes + + +def test_ddg1_trigger(mock_ddg1): + """Test the on_trigger method of DDG1.""" + mock_ddg1.state.event_status._read_pv.mock_data = ( + 5 # STATUSBITS.END_OF_DELAY.value + STATUSBITS.TRIG.value ) - calls.extend([mock.call("amplitude", ddg_config_defaults["amplitude"])]) - calls.extend([mock.call("offset", ddg_config_defaults["offset"])]) - calls.extend( - [ - mock.call( - "reference", 0, [f"channel{pair}.ch1" for pair in channel_pairs["all_delay_pairs"]] - ) - ] - ) - calls.extend( - [ - mock.call( - "reference", 0, [f"channel{pair}.ch2" for pair in channel_pairs["all_delay_pairs"]] - ) - ] - ) - mock_DDGSetup.on_wait_for_connection() - mock_DDGSetup.parent.set_channels.assert_has_calls(calls) + status = mock_ddg1.trigger() + assert status.done is True + assert status.success is True + assert mock_ddg1.trigger_shot.get() == 1 -# TODO put back once the logic is implemented -# def test_on_stage(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs): -# """Test the prepare_ddg method.""" -# # Set first attributes of parent class -# for k, v in scaninfo.items(): -# setattr(mock_DDGSetup.parent.scaninfo, k, v) -# for k, v in ddg_config_defaults.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# for k, v in ddg_config_scan.items(): -# getattr(mock_DDGSetup.parent, k).get.return_value = v -# # Call the function you want to test -# mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] -# mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] +def test_ddg1_stop(mock_ddg1): + """Test the on_stop method of DDG1.""" + mock_ddg1.burst_mode.put(1) # Enable burst mode + mock_ddg1.stop() + assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled -# mock_DDGSetup.prepare_ddg() -# mock_DDGSetup.parent.set_trigger.assert_called_once_with( -# getattr(TriggerSource, ddg_config_scan["set_trigger_source"]) -# ) -# if scaninfo["scan_type"] == "step": -# if ddg_config_scan["set_high_on_exposure"]: -# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] -# exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * ( -# scaninfo["exp_time"] + scaninfo["readout_time"] -# ) -# total_exposure = exp_time -# delay_burst = ddg_config_defaults["delay_burst"] -# else: -# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] -# total_exposure = exp_time + scaninfo["readout_time"] -# delay_burst = ddg_config_defaults["delay_burst"] -# num_burst_cycle = ( -# scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"] -# ) -# elif scaninfo["scan_type"] == "fly": -# if ddg_config_scan["set_high_on_exposure"]: -# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] -# exp_time = ( -# ddg_config_defaults["delta_width"] -# + scaninfo["num_points"] * scaninfo["exp_time"] -# + (scaninfo["num_points"] - 1) * scaninfo["readout_time"] -# ) -# total_exposure = exp_time -# delay_burst = ddg_config_defaults["delay_burst"] -# else: -# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] -# total_exposure = exp_time + scaninfo["readout_time"] -# delay_burst = ddg_config_defaults["delay_burst"] -# num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"] -# # mock_DDGSetup.parent.burst_enable.assert_called_once_with( -# # mock.call(num_burst_cycle, delay_burst, total_exposure, config="first") -# # ) -# mock_DDGSetup.parent.burst_enable.assert_called_once_with( -# num_burst_cycle, delay_burst, total_exposure, config="first" -# ) -# if not ddg_config_scan["trigger_width"]: -# call = mock.call("width", exp_time) -# assert call in mock_DDGSetup.parent.set_channels.mock_calls -# else: -# call = mock.call("width", ddg_config_scan["trigger_width"]) -# assert call in mock_DDGSetup.parent.set_channels.mock_calls -# if ddg_config_scan["set_high_on_exposure"]: -# calls = [ -# mock.call("width", value, channels=[channel]) -# for value, channel in zip( -# ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"] -# ) -# if value != 0 -# ] -# if calls: -# assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls) +def test_ddg2_on_connected(mock_ddg2): + """Test on connected method of DDG2.""" + mock_ddg2.on_connected() + # IO defaults + assert mock_ddg2.burst_mode.get() == 0 + assert mock_ddg2.ab.io.amplitude.get() == 5.0 + assert mock_ddg2.cd.io.offset.get() == 0.0 + assert mock_ddg2.ef.io.polarity.get() == 1 + assert mock_ddg2.gh.io.ttl_mode.get() == 1 + + # reference defaults + assert mock_ddg2.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value + assert mock_ddg2.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value + assert mock_ddg2.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value + assert mock_ddg2.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value + assert mock_ddg2.ef.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value + assert mock_ddg2.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value + assert mock_ddg2.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value + assert mock_ddg2.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value + + # Default trigger source + assert mock_ddg2.trigger_source.get() == 1 # TRIGGERSOURCE.EXT_RISING_EDGE.value + + +def test_ddg2_stage(mock_ddg2): + """Test the on_stage method of DDG2.""" + exp_time = 0.1 + frames_per_trigger = 10 + mock_ddg2.on_connected() + + mock_ddg2.burst_mode.put(0) + mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time + mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + + mock_ddg2.stage() + + assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled + assert np.isclose(mock_ddg2.ab.delay.get(), 0) + assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"]) + assert mock_ddg2.burst_count.get() == frames_per_trigger + assert np.isclose(mock_ddg2.burst_delay.get(), 0) + assert np.isclose(mock_ddg2.burst_period.get(), exp_time) + + assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value + + assert mock_ddg2.staged == ophyd.Staged.yes + + +def test_ddg2_trigger(mock_ddg2): + """Test the on_trigger method of DDG2.""" + mock_ddg2.trigger_shot.put(0) + status = mock_ddg2.trigger() + assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger + status.wait() + assert status.done is True + assert status.success is True + + +def test_ddg2_stop(mock_ddg2): + """Test the on_stop method of DDG2.""" + mock_ddg2.burst_mode.put(1) # Enable burst mode + mock_ddg2.stop() + assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled