refactor(ddg): refactor delay generator for cSAXS #82
@@ -53,89 +53,6 @@ eiger9m:
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: false
|
||||
ddg_detectors:
|
||||
description: DelayGenerator for detector triggering
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
|
||||
deviceConfig:
|
||||
prefix: 'delaygen:DG1:'
|
||||
ddg_config:
|
||||
delay_burst: 40.e-3
|
||||
delta_width: 0
|
||||
additional_triggers: 0
|
||||
polarity:
|
||||
- 1 # T0 -> DDG MCS
|
||||
- 0 # eiger
|
||||
- 1 # falcon
|
||||
- 1
|
||||
- 1
|
||||
amplitude: 4.5
|
||||
offset: 0
|
||||
thres_trig_level: 2.5
|
||||
set_high_on_exposure: False
|
||||
set_high_on_stage: False
|
||||
deviceTags:
|
||||
- cSAXS
|
||||
- ddg_detectors
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: false
|
||||
ddg_mcs:
|
||||
description: DelayGenerator for mcs triggering
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
|
||||
deviceConfig:
|
||||
prefix: 'delaygen:DG2:'
|
||||
ddg_config:
|
||||
delay_burst: 0
|
||||
delta_width: 0
|
||||
additional_triggers: 1
|
||||
polarity:
|
||||
- 1
|
||||
- 0
|
||||
- 1
|
||||
- 1
|
||||
- 1
|
||||
amplitude: 4.5
|
||||
offset: 0
|
||||
thres_trig_level: 2.5
|
||||
set_high_on_exposure: False
|
||||
set_high_on_stage: False
|
||||
set_trigger_source: EXT_RISING_EDGE
|
||||
trigger_width: 3.e-3
|
||||
deviceTags:
|
||||
- cSAXS
|
||||
- ddg_mcs
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: false
|
||||
ddg_fsh:
|
||||
description: DelayGenerator for fast shutter control
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
|
||||
deviceConfig:
|
||||
prefix: 'delaygen:DG3:'
|
||||
ddg_config:
|
||||
delay_burst: 0
|
||||
delta_width: 80.e-3
|
||||
additional_triggers: 0
|
||||
polarity:
|
||||
- 1
|
||||
- 1
|
||||
- 1
|
||||
- 1
|
||||
- 1
|
||||
amplitude: 4.5
|
||||
offset: 0
|
||||
thres_trig_level: 2.5
|
||||
set_high_on_exposure: True
|
||||
set_high_on_stage: False
|
||||
deviceTags:
|
||||
- cSAXS
|
||||
- ddg_fsh
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: false
|
||||
falcon:
|
||||
description: Falcon detector x-ray fluoresence
|
||||
deviceClass: csaxs_bec.devices.epics.falcon_csaxs.FalconcSAXS
|
||||
|
||||
45
csaxs_bec/device_configs/ddg_test.yaml
Normal file
45
csaxs_bec/device_configs/ddg_test.yaml
Normal file
@@ -0,0 +1,45 @@
|
||||
ddg1:
|
||||
description: Main delay Generator for triggering
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG1
|
||||
enabled: true
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-CPCL-DDG1:'
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: true
|
||||
|
||||
ddg2:
|
||||
description: Detector delay Generator for trigger burst
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG2
|
||||
enabled: true
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-CPCL-DDG2:'
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
samx:
|
||||
readoutPriority: baseline
|
||||
deviceClass: ophyd_devices.SimPositioner
|
||||
deviceConfig:
|
||||
delay: 1
|
||||
limits:
|
||||
- -50
|
||||
- 50
|
||||
tolerance: 0.01
|
||||
update_frequency: 400
|
||||
deviceTags:
|
||||
- user motors
|
||||
enabled: true
|
||||
readOnly: false
|
||||
|
||||
bpm4i:
|
||||
readoutPriority: monitored
|
||||
deviceClass: ophyd_devices.SimMonitor
|
||||
deviceConfig:
|
||||
deviceTags:
|
||||
- beamline
|
||||
enabled: true
|
||||
readOnly: false
|
||||
@@ -1,50 +0,0 @@
|
||||
ddg_detectors:
|
||||
description: DelayGenerator for detector triggering
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-CPCL-DDG3:'
|
||||
ddg_config:
|
||||
delay_burst: 40.e-3
|
||||
delta_width: 0
|
||||
additional_triggers: 0
|
||||
polarity:
|
||||
- 1 # T0 -> DDG MCS
|
||||
- 0 # eiger
|
||||
- 1 # falcon
|
||||
- 1
|
||||
- 1
|
||||
amplitude: 4.5
|
||||
offset: 0
|
||||
thres_trig_level: 2.5
|
||||
set_high_on_exposure: False
|
||||
set_high_on_stage: False
|
||||
deviceTags:
|
||||
- cSAXS
|
||||
- ddg_detectors
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: True
|
||||
bpm4i:
|
||||
readoutPriority: monitored
|
||||
deviceClass: ophyd_devices.SimMonitor
|
||||
deviceConfig:
|
||||
deviceTags:
|
||||
- beamline
|
||||
enabled: true
|
||||
readOnly: false
|
||||
samx:
|
||||
readoutPriority: baseline
|
||||
deviceClass: ophyd_devices.SimPositioner
|
||||
deviceConfig:
|
||||
delay: 1
|
||||
limits:
|
||||
- -50
|
||||
- 50
|
||||
tolerance: 0.01
|
||||
update_frequency: 400
|
||||
deviceTags:
|
||||
- user motors
|
||||
enabled: true
|
||||
readOnly: false
|
||||
|
||||
@@ -1,274 +0,0 @@
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import Component, DeviceStatus, Kind
|
||||
from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource
|
||||
from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
from ophyd_devices.utils import bec_utils
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class DelayGeneratorcSAXSError(Exception):
|
||||
"""Exception raised for errors."""
|
||||
|
||||
|
||||
class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]):
|
||||
"""
|
||||
Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS
|
||||
"""
|
||||
|
||||
def on_wait_for_connection(self) -> None:
|
||||
"""Init default parameter after the all signals are connected"""
|
||||
for ii, channel in enumerate(self.parent.all_channels):
|
||||
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
|
||||
|
||||
self.parent.set_channels("amplitude", self.parent.amplitude.get())
|
||||
self.parent.set_channels("offset", self.parent.offset.get())
|
||||
# Setup reference
|
||||
self.parent.set_channels(
|
||||
"reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
|
||||
)
|
||||
self.parent.set_channels(
|
||||
"reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
|
||||
)
|
||||
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
|
||||
# Set threshold level for ext. pulses
|
||||
self.parent.level.put(self.parent.thres_trig_level.get())
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"Hook execute before the scan starts"
|
||||
if self.parent.scaninfo.scan_type == "step":
|
||||
exp_time = self.parent.scaninfo.exp_time
|
||||
delay = 0
|
||||
self.parent.burst_disable()
|
||||
self.parent.set_trigger(TriggerSource.SINGLE_SHOT)
|
||||
self.parent.set_channels(signal="width", value=exp_time)
|
||||
self.parent.set_channels(signal="delay", value=delay)
|
||||
return
|
||||
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
|
||||
if scan_name == "jjf_test":
|
||||
# TODO implement the logic for JJF triggering
|
||||
exp_time = 480e-6 # self.parent.scaninfo.exp_time
|
||||
readout = 20e-6 # self.parent.scaninfo.readout_time
|
||||
total_exposure = exp_time + readout
|
||||
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
|
||||
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
|
||||
delay = 0
|
||||
delay_burst = self.parent.delay_burst.get()
|
||||
|
||||
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
|
||||
|
||||
self.parent.set_channels(signal="width", value=exp_time)
|
||||
self.parent.set_channels(signal="delay", value=delay)
|
||||
self.parent.burst_enable(
|
||||
count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first"
|
||||
)
|
||||
logger.info(
|
||||
f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}"
|
||||
)
|
||||
|
||||
def on_trigger(self) -> DeviceStatus:
|
||||
"""Method to be executed upon trigger"""
|
||||
if self.parent.scaninfo.scan_type == "step":
|
||||
self.parent.trigger_shot.put(1)
|
||||
return
|
||||
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
|
||||
if scan_name == "jjf_test":
|
||||
exp_time = 480e-6 # self.parent.scaninfo.exp_time
|
||||
readout = 20e-6 # self.parent.scaninfo.readout_time
|
||||
total_exposure = exp_time + readout
|
||||
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
|
||||
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
|
||||
|
||||
# Start trigger cycle
|
||||
self.parent.trigger_burst_readout.put(1)
|
||||
|
||||
# Create status object that will wait for the end of the burst cycle
|
||||
status = self.wait_with_status(
|
||||
signal_conditions=[(self.parent.burst_cycle_finished, 1)],
|
||||
timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure
|
||||
check_stopped=True,
|
||||
exception_on_timeout=DelayGeneratorcSAXSError(
|
||||
f"{self.parent.name} run into timeout in complete call."
|
||||
),
|
||||
)
|
||||
logger.info(f"Return status {self.parent.name}")
|
||||
return status
|
||||
|
||||
def on_complete(self) -> DeviceStatus:
|
||||
pass
|
||||
|
||||
def on_pre_scan(self) -> None:
|
||||
"""
|
||||
Method called by pre_scan hook in parent class.
|
||||
|
||||
Executes trigger if premove_trigger is Trus.
|
||||
"""
|
||||
if self.parent.premove_trigger.get() is True:
|
||||
self.parent.trigger_shot.put(1)
|
||||
|
||||
|
||||
class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
|
||||
"""
|
||||
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
|
||||
|
||||
Default values for setting up DDG.
|
||||
Note: checks of set calues are not (only partially) included, check manual for details on possible settings.
|
||||
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
|
||||
|
||||
- delay_burst : (float >=0) Delay between trigger and first pulse in burst mode
|
||||
- delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition
|
||||
- additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)
|
||||
- polarity : (list of 0/1) polarity for different channels
|
||||
- amplitude : (float) amplitude voltage of TTLs
|
||||
- offset : (float) offset for ampltitude
|
||||
- thres_trig_level : (float) threshold of trigger amplitude
|
||||
|
||||
Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):
|
||||
|
||||
- set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.
|
||||
# TODO trigger_width and fixed_ttl could be combined into single list.
|
||||
- fixed_ttl_width : (list of either 1 or 0), one for each channel.
|
||||
- trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.
|
||||
- set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.
|
||||
- premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).
|
||||
- set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.
|
||||
"""
|
||||
|
||||
custom_prepare_cls = DDGSetup
|
||||
|
||||
# Custom signals passed on during the init procedure via BEC
|
||||
# TODO review whether those should remain here like that
|
||||
|
||||
delay_burst = Component(
|
||||
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
|
||||
)
|
||||
|
||||
delta_width = Component(
|
||||
bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config"
|
||||
)
|
||||
|
||||
additional_triggers = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="additional_triggers",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
|
||||
polarity = Component(
|
||||
bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config"
|
||||
)
|
||||
|
||||
fixed_ttl_width = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="fixed_ttl_width",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
|
||||
amplitude = Component(
|
||||
bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config"
|
||||
)
|
||||
|
||||
offset = Component(
|
||||
bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config"
|
||||
)
|
||||
|
||||
thres_trig_level = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="thres_trig_level",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
|
||||
set_high_on_exposure = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="set_high_on_exposure",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
|
||||
set_high_on_stage = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="set_high_on_stage",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
|
||||
set_trigger_source = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="set_trigger_source",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
|
||||
trigger_width = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="trigger_width",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
premove_trigger = Component(
|
||||
bec_utils.ConfigSignal,
|
||||
name="premove_trigger",
|
||||
kind="config",
|
||||
config_storage_name="ddg_config",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
prefix: str = "",
|
||||
kind: Kind = None,
|
||||
ddg_config: dict = None,
|
||||
parent=None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
prefix (str, optional): Prefix of the device. Defaults to "".
|
||||
name (str): Name of the device.
|
||||
kind (str, optional): Kind of the device. Defaults to None.
|
||||
read_attrs (list, optional): List of attributes to read. Defaults to None.
|
||||
configuration_attrs (list, optional): List of attributes to configure. Defaults to None.
|
||||
parent (Device, optional): Parent device. Defaults to None.
|
||||
device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None.
|
||||
sim_mode (bool, optional): Simulation mode flag. Defaults to False.
|
||||
ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None.
|
||||
|
||||
"""
|
||||
|
||||
# Default values for ddg_config signals
|
||||
self.ddg_config = {
|
||||
# Setup default values
|
||||
f"{name}_delay_burst": 0,
|
||||
f"{name}_delta_width": 0,
|
||||
f"{name}_additional_triggers": 0,
|
||||
f"{name}_polarity": [1, 1, 1, 1, 1],
|
||||
f"{name}_amplitude": 4.5,
|
||||
f"{name}_offset": 0,
|
||||
f"{name}_thres_trig_level": 2.5,
|
||||
# Values for different behaviour during scans
|
||||
f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0],
|
||||
f"{name}_trigger_width": None,
|
||||
f"{name}_set_high_on_exposure": False,
|
||||
f"{name}_set_high_on_stage": False,
|
||||
f"{name}_set_trigger_source": "SINGLE_SHOT",
|
||||
f"{name}_premove_trigger": False,
|
||||
}
|
||||
if ddg_config is not None:
|
||||
# pylint: disable=expression-not-assigned
|
||||
[self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()]
|
||||
super().__init__(
|
||||
prefix=prefix,
|
||||
name=name,
|
||||
kind=kind,
|
||||
parent=parent,
|
||||
device_manager=device_manager,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3")
|
||||
14
csaxs_bec/devices/epics/delay_generator_csaxs/__init__.py
Normal file
14
csaxs_bec/devices/epics/delay_generator_csaxs/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from .ddg_1 import DDG1
|
||||
from .ddg_2 import DDG2
|
||||
from .delay_generator_csaxs import (
|
||||
BURSTCONFIG,
|
||||
CHANNELREFERENCE,
|
||||
OUTPUTPOLARITY,
|
||||
STATUSBITS,
|
||||
TRIGGERINHIBIT,
|
||||
TRIGGERSOURCE,
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayChannelNames,
|
||||
)
|
||||
from .error_registry import ERROR_CODES
|
||||
169
csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py
Normal file
169
csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""
|
||||
DDG1 delay generator
|
||||
|
||||
This module implements the DDG1 delay generator logic for the CSAXS beamline.
|
||||
The attached PDF trigger_scheme_ddg1_ddg2.pdf provides a more detailed overview of
|
||||
the trigger scheme. If the logic changes in the future, it is highly recommended to
|
||||
update the PDF accordingly.
|
||||
|
||||
The DDG1 is the main trigger delay generator for the CSAXS beamline. It will
|
||||
receive either a soft trigger from BEC (depending on the scan type) or a hardware trigger
|
||||
from a beamline device (e.g. the Galil stages). It is responsible for opening the shutter
|
||||
and sending a trigger to the Delay Generator CSAXS (DDG2), which in turn will
|
||||
send the trigger to the detectors. DDG1 will not be witout burst mode, but rather in standard
|
||||
mode creating delays for the channels ab, cd, ef, gh.
|
||||
|
||||
A brief summary of the DDG1 logic:
|
||||
DELAY PAIRS:
|
||||
- DelayPair ab is connected to the EXT/EN of DDG2.
|
||||
- DelayPair cd is connected to the SHUTTER.
|
||||
- DelayPair ef is connected to an OR gate together with the detector
|
||||
PULSE train for the MCS card. The MCS card needs one extra pulse to forward points.
|
||||
|
||||
DELAY CHANNELS:
|
||||
- a = t0 + 2ms (2ms delay to allow the shutter to open)
|
||||
- b = a + 1us (short pulse)
|
||||
- c = t0
|
||||
- d = a + exp_time * burst_count + 1ms (to allow the shutter to close)
|
||||
- e = d
|
||||
- f = e + 1us (short pulse to OR gate for MCS triggering)
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
||||
CHANNELREFERENCE,
|
||||
OUTPUTPOLARITY,
|
||||
STATUSBITS,
|
||||
TRIGGERSOURCE,
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayGeneratorCSAXS,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
|
||||
"amplitude": 5.0,
|
||||
"offset": 0.0,
|
||||
"polarity": OUTPUTPOLARITY.POSITIVE,
|
||||
"mode": "ttl",
|
||||
}
|
||||
|
||||
DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
|
||||
"t0": _DEFAULT_CHANNEL_CONFIG,
|
||||
"ab": _DEFAULT_CHANNEL_CONFIG,
|
||||
"cd": _DEFAULT_CHANNEL_CONFIG,
|
||||
"ef": _DEFAULT_CHANNEL_CONFIG,
|
||||
"gh": _DEFAULT_CHANNEL_CONFIG,
|
||||
}
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
|
||||
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
|
||||
|
||||
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
|
||||
("A", CHANNELREFERENCE.T0), # T0 + 2ms delay
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0), # T0
|
||||
("D", CHANNELREFERENCE.C),
|
||||
("E", CHANNELREFERENCE.D), # D One extra pulse once shutter closes for MCS
|
||||
("F", CHANNELREFERENCE.E), # E + 1mu s
|
||||
("G", CHANNELREFERENCE.T0),
|
||||
("H", CHANNELREFERENCE.G),
|
||||
]
|
||||
|
||||
|
||||
class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1.
|
||||
It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device (e.g. the Galil stages).
|
||||
It is operated in standard mode, not burst mode and will trigger the EXT/EN of DDG2 (channel ab).
|
||||
It is responsible for opening the shutter (channel cd) and sending an extra trigger to an or gate for the MCS card (channel ef).
|
||||
"""
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Set the default values on the device - intended to overwrite everything to a usable default state.
|
||||
Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE,
|
||||
and turns off burst mode.
|
||||
"""
|
||||
self.burst_disable() # it is possible to miss setting settings if burst is enabled
|
||||
for channel, config in DEFAULT_IO_CONFIG.items():
|
||||
self.set_io_values(channel, **config)
|
||||
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
|
||||
self.set_references_for_channels(DEFAULT_REFERENCES)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
|
||||
For standard scans, it will be triggered by a soft trigger from BEC.
|
||||
It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages.
|
||||
|
||||
This DDG is always not in burst mode.
|
||||
"""
|
||||
self.burst_disable()
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
|
||||
# Trigger DDG2
|
||||
# a = t0 + 2ms, b = a + 1us
|
||||
# a has reference to t0, b has reference to a
|
||||
self.set_delay_pairs(channel="ab", delay=2e-3, width=1e-6)
|
||||
# Trigger shutter
|
||||
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
|
||||
# d = c/t0 + 2ms + exp_time * burst_count + 1ms
|
||||
# c has reference to t0, d has reference to c
|
||||
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
|
||||
# Trigger extra pulse for MCS OR gate
|
||||
# f = e + 1us
|
||||
# e has refernce to d, f has reference to e
|
||||
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Note, we need to add a delay to the StatusBits callback on the event_status.
|
||||
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
|
||||
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
|
||||
"""
|
||||
st = StatusBase()
|
||||
self.cancel_on_stop(st)
|
||||
self.trigger_shot.put(1, use_complete=True)
|
||||
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
|
||||
self.cancel_on_stop(st)
|
||||
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=2)
|
||||
return status
|
||||
|
||||
def wait_for_status(
|
||||
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
|
||||
) -> None:
|
||||
"""Wait for a event status bit to be set.
|
||||
|
||||
Args:
|
||||
status (StatusBase): The status object to update.
|
||||
bit_event (STATUSBITS): The event status bit to wait for.
|
||||
timeout (float): Maximum time to wait for the event status bit to be set.
|
||||
"""
|
||||
current_time = time.time()
|
||||
while not status.done:
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
event_status = self.state.event_status.get()
|
||||
if (STATUSBITS(event_status) & bit_event) == bit_event:
|
||||
status.set_finished()
|
||||
if time.time() - current_time > timeout:
|
||||
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.05) # Give time for the IOC to be ready again
|
||||
return status
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
self.stop_ddg()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:")
|
||||
ddg.wait_for_connection(all_signals=True, timeout=30)
|
||||
ddg.summary()
|
||||
151
csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py
Normal file
151
csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py
Normal file
@@ -0,0 +1,151 @@
|
||||
"""
|
||||
DDG2 delay generator
|
||||
|
||||
This module implements the DDG2 delay generator logic for the CSAXS beamline.
|
||||
Please check also the code for DDG1, aswell as the attached PDF trigger_scheme_ddg1_ddg2.pdf
|
||||
|
||||
The DDG2 is responsible for creating a burst of triggers for all relevant detectors.
|
||||
It will receive a be triggered from the DDG1 through the EXT/EN channel.
|
||||
|
||||
A brief summary of the DDG2 logic:
|
||||
DELAY PAIRS:
|
||||
- EXT/EN is connected to the DDG1 delay pair ab.
|
||||
- DelayPair ab is connected to a multiplexer, multiplexing the trigger to the detectors.
|
||||
|
||||
DELAY CHANNELS:
|
||||
- a = t0
|
||||
- b = a + (exp_time - READOUT_TIMES)
|
||||
|
||||
Burst mode is enabled:
|
||||
- Burst count is set to the number of frames per trigger.
|
||||
- Burst delay is set to 0.
|
||||
- Burst period is set to the exposure time.
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
||||
CHANNELREFERENCE,
|
||||
OUTPUTPOLARITY,
|
||||
STATUSBITS,
|
||||
TRIGGERSOURCE,
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayGeneratorCSAXS,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
|
||||
"amplitude": 5.0,
|
||||
"offset": 0.0,
|
||||
"polarity": OUTPUTPOLARITY.POSITIVE,
|
||||
"mode": "ttl",
|
||||
}
|
||||
|
||||
DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
|
||||
"t0": _DEFAULT_CHANNEL_CONFIG,
|
||||
"ab": _DEFAULT_CHANNEL_CONFIG,
|
||||
"cd": _DEFAULT_CHANNEL_CONFIG,
|
||||
"ef": _DEFAULT_CHANNEL_CONFIG,
|
||||
"gh": _DEFAULT_CHANNEL_CONFIG,
|
||||
}
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.EXT_RISING_EDGE
|
||||
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
|
||||
|
||||
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
|
||||
("A", CHANNELREFERENCE.T0),
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0),
|
||||
("D", CHANNELREFERENCE.C),
|
||||
("E", CHANNELREFERENCE.T0),
|
||||
("F", CHANNELREFERENCE.E),
|
||||
("G", CHANNELREFERENCE.T0),
|
||||
("H", CHANNELREFERENCE.G),
|
||||
]
|
||||
|
||||
|
||||
class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG2.
|
||||
This device is responsible for creating triggers in burst mode and is connected to a multiplexer that
|
||||
distributes the trigger to the detectors. The DDG2 is triggered by the DDG1 through the EXT/EN channel.
|
||||
"""
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Set the default values on the device - intended to overwrite everything to a usable default state.
|
||||
Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE.
|
||||
"""
|
||||
self.burst_disable() # it is possible to miss setting settings if burst is enabled
|
||||
for channel, config in DEFAULT_IO_CONFIG.items():
|
||||
self.set_io_values(channel, **config)
|
||||
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
|
||||
self.set_references_for_channels(DEFAULT_REFERENCES)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
|
||||
For standard scans, it will be triggered by a soft trigger from BEC.
|
||||
It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages.
|
||||
|
||||
This DDG is always not in burst mode.
|
||||
"""
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
|
||||
# a = t0
|
||||
# a has reference to t0, b has reference to a
|
||||
burst_pulse_width = exp_time - DEFAULT_READOUT_TIMES["ab"]
|
||||
self.set_delay_pairs(channel="ab", delay=0, width=burst_pulse_width)
|
||||
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
|
||||
|
||||
def on_pre_scan(self):
|
||||
"""
|
||||
The delay generator occasionally needs a bit extra time to process all
|
||||
commands from stage. Therefore, we introduce here a short sleep
|
||||
"""
|
||||
# Delay Generator occasionaly needs a bit extra time to process all commands, sleep 50ms
|
||||
time.sleep(0.05)
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
DDG2 will not receive a trigger from BEC, but will be triggered by the DDG1 through the EXT/EN channel.
|
||||
"""
|
||||
|
||||
def wait_for_status(
|
||||
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
|
||||
) -> None:
|
||||
"""Wait for a event status bit to be set.
|
||||
|
||||
Args:
|
||||
status (StatusBase): The status object to update.
|
||||
bit_event (STATUSBITS): The event status bit to wait for.
|
||||
timeout (float): Maximum time to wait for the event status bit to be set.
|
||||
"""
|
||||
current_time = time.time()
|
||||
while not status.done:
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
event_status = self.state.event_status.get()
|
||||
if (STATUSBITS(event_status) & bit_event) == bit_event:
|
||||
status.set_finished()
|
||||
if time.time() - current_time > timeout:
|
||||
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.05) # Give time for the IOC to be ready again
|
||||
return status
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
self.stop_ddg()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ddg = DDG2(name="ddg2", prefix="X12SA-CPCL-DDG2:")
|
||||
ddg.wait_for_connection(all_signals=True, timeout=30)
|
||||
ddg.summary()
|
||||
@@ -0,0 +1,717 @@
|
||||
"""
|
||||
Delay generator implementation for CSAXS.
|
||||
|
||||
Detailed information can be found in the manual:
|
||||
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
|
||||
"""
|
||||
|
||||
import enum
|
||||
from typing import Literal, TypedDict
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal
|
||||
from ophyd_devices import StatusBase, SubscriptionStatus
|
||||
from typeguard import typechecked
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
|
||||
|
||||
logger = bec_logger.logger
|
||||
DelayChannelNames = Literal["ab", "cd", "ef", "gh"]
|
||||
AllChannelNames = Literal["t0", "ab", "cd", "ef", "gh"]
|
||||
LiteralChannels = Literal["A", "B", "C", "D", "E", "F", "G", "H"]
|
||||
|
||||
|
||||
class CHANNELREFERENCE(enum.Enum):
|
||||
T0 = 0
|
||||
A = 1
|
||||
B = 2
|
||||
C = 3
|
||||
D = 4
|
||||
E = 5
|
||||
F = 6
|
||||
G = 7
|
||||
H = 8
|
||||
|
||||
|
||||
class BURSTCONFIG(enum.Enum):
|
||||
"""Enum option for burst_config signal of the delay generator.
|
||||
|
||||
ALL_CYCLES: T0 triggere for all cycles.
|
||||
FIRST_CYCLE: T0 only triggered for the first cycle.
|
||||
"""
|
||||
|
||||
ALL_CYCLES = 0
|
||||
FIRST_CYCLE = 1
|
||||
|
||||
|
||||
class TRIGGERSOURCE(enum.Enum):
|
||||
"""Enum options for the trigger_source signal of the delay generator."""
|
||||
|
||||
INTERNAL = 0
|
||||
EXT_RISING_EDGE = 1
|
||||
EXT_FALLING_EDGE = 2
|
||||
SS_EXT_RISING_EDGE = 3
|
||||
SS_EXT_FALLING_EDGE = 4
|
||||
SINGLE_SHOT = 5
|
||||
LINE = 6
|
||||
|
||||
|
||||
class TRIGGERINHIBIT(enum.Enum):
|
||||
"""Enum options for the trigger_inhibit signal of the delay generator."""
|
||||
|
||||
OFF = 0
|
||||
TRIGGERS = 1
|
||||
AB = 2
|
||||
AB_CD = 3
|
||||
AB_CD_EF = 4
|
||||
AB_CD_EF_GH = 5
|
||||
|
||||
|
||||
class OUTPUTPOLARITY(enum.Enum):
|
||||
"""Enum options for the polarity signal of the static pair."""
|
||||
|
||||
NEGATIVE = 0
|
||||
POSITIVE = 1
|
||||
|
||||
|
||||
class STATUSBITS(enum.IntFlag):
|
||||
"""Bit flags for the status signal of the delay generator."""
|
||||
|
||||
TRIG = 1 << 0 # Got a trigger.
|
||||
RATE = 1 << 1 # Got a trigger while a delay or burst was in progress.
|
||||
END_OF_DELAY = 1 << 2 # A delay cycle has completed.
|
||||
END_OF_BURST = 1 << 3 # A burst cycle has completed.
|
||||
INHIBIT = 1 << 4 # A trigger or output delay cycle was inhibited.
|
||||
ABORT_DELAY = 1 << 5 # A delay cycle was aborted early.
|
||||
PLL_UNLOCK = 1 << 6 # The 100 MHz PLL came unlocked.
|
||||
RB_UNLOCK = 1 << 7 # The installed Rb oscillator is unlocked.
|
||||
|
||||
def describe(self) -> dict:
|
||||
"""Return a description of the status bits."""
|
||||
descriptions = {
|
||||
STATUSBITS.TRIG: "Got a trigger.",
|
||||
STATUSBITS.RATE: "Got a trigger while a delay or burst was in progress.",
|
||||
STATUSBITS.END_OF_DELAY: "A delay cycle has completed.",
|
||||
STATUSBITS.END_OF_BURST: "A burst cycle has completed.",
|
||||
STATUSBITS.INHIBIT: "A trigger or output delay cycle was inhibited.",
|
||||
STATUSBITS.ABORT_DELAY: "A delay cycle was aborted early.",
|
||||
STATUSBITS.PLL_UNLOCK: "The 100 MHz PLL came unlocked.",
|
||||
STATUSBITS.RB_UNLOCK: "The installed Rb oscillator is unlocked.",
|
||||
}
|
||||
return {flag.name: descriptions[flag] for flag in STATUSBITS if flag in self}
|
||||
|
||||
|
||||
class StatusBitsCompareStatus(SubscriptionStatus):
|
||||
"""Compare status for STATUSBITS comparison."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
signal: EpicsSignalRO,
|
||||
value: STATUSBITS,
|
||||
raise_states: list[STATUSBITS] | None = None,
|
||||
*args,
|
||||
event_type=None,
|
||||
timeout: float | None = None,
|
||||
add_delay:float|None = None,
|
||||
settle_time: float = 0,
|
||||
run: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the compare status with a signal."""
|
||||
self._signal = signal
|
||||
self._value = value
|
||||
self._add_delay = add_delay or 0
|
||||
self._raise_states = raise_states or []
|
||||
super().__init__(
|
||||
device=signal,
|
||||
callback=self._compare_callback,
|
||||
timeout=timeout,
|
||||
settle_time=settle_time,
|
||||
event_type=event_type,
|
||||
run=run,
|
||||
)
|
||||
|
||||
def _compare_callback(self, value, **kwargs) -> bool:
|
||||
"""Callback for subscription status"""
|
||||
obj = kwargs.get("obj", None)
|
||||
if obj is None:
|
||||
name = 'no object received'
|
||||
else:
|
||||
name=obj.name
|
||||
if any((STATUSBITS(value) & state) == state for state in self._raise_states):
|
||||
self.set_exception(
|
||||
ValueError(
|
||||
f"Status bits {STATUSBITS(value).describe()} raised an exception: {self._raise_states}"
|
||||
)
|
||||
)
|
||||
return False
|
||||
if self._add_delay !=0:
|
||||
time.sleep(self._add_delay)
|
||||
|
||||
return (STATUSBITS(value) & self._value) == self._value
|
||||
|
||||
|
||||
class ChannelConfig(TypedDict):
|
||||
amplitude: float | None
|
||||
offset: float | None
|
||||
polarity: OUTPUTPOLARITY | Literal[0, 1] | None
|
||||
mode: Literal["ttl", "nim"] | None
|
||||
|
||||
|
||||
class StaticPair(Device):
|
||||
"""
|
||||
Class to represent a static pair (T0, aswell as all AB, CB, EF, GH channels).
|
||||
It allows setting the logic levels, but the timing is fixed.
|
||||
The signal is high after receiving the trigger until the end of the holdoff period.
|
||||
"""
|
||||
|
||||
ttl_mode = Cpt(
|
||||
EpicsSignal,
|
||||
"OutputModeTtlSS.PROC",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Set the output mode to TTL",
|
||||
)
|
||||
nim_mode = Cpt(
|
||||
EpicsSignal,
|
||||
"OutputModeNimSS.PROC",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Set the output mode to NIM",
|
||||
)
|
||||
polarity = Cpt(
|
||||
EpicsSignal,
|
||||
"OutputPolarityBI",
|
||||
write_pv="OutputPolarityBO",
|
||||
name="polarity",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Control the polarity of the output signal. POS 1 or NEG 0",
|
||||
)
|
||||
|
||||
amplitude = Cpt(
|
||||
EpicsSignal,
|
||||
"OutputAmpAI",
|
||||
write_pv="OutputAmpAO",
|
||||
name="amplitude",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Amplitude of the output signal in volts.",
|
||||
)
|
||||
|
||||
offset = Cpt(
|
||||
EpicsSignal,
|
||||
"OutputOffsetAI",
|
||||
write_pv="OutputOffsetAO",
|
||||
name="offset",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Offset of the output signal in volts.",
|
||||
)
|
||||
|
||||
|
||||
class Channel(Device):
|
||||
"""
|
||||
Represents a single channel A, B, C, ... of the delay generator.
|
||||
"""
|
||||
|
||||
setpoint = Cpt(
|
||||
EpicsSignal,
|
||||
write_pv="DelayAO",
|
||||
read_pv="DelayAI",
|
||||
put_complete=True,
|
||||
auto_monitor=True,
|
||||
kind=Kind.omitted,
|
||||
doc="Setpoint value for the delay of the channel",
|
||||
)
|
||||
reference = Cpt(
|
||||
EpicsSignal,
|
||||
"ReferenceMO",
|
||||
put_complete=True,
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Reference channel T0,A,B,.. for the delay of the setpoint",
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
Initialize the channel with a setpoint and reference signal.
|
||||
"""
|
||||
# The read PV in EpicsSignal does not receive the prefix.. so we need to add it manually.
|
||||
self.__class__.__dict__["setpoint"].kwargs["read_pv"] = args[0] + "DelayAI"
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class WidthSignal(Signal):
|
||||
"""A signal that represents the width of a channel."""
|
||||
|
||||
def get(self, **kwargs) -> float:
|
||||
"""
|
||||
Get the width of the channel.
|
||||
|
||||
Returns:
|
||||
float: The width of the channel in seconds.
|
||||
"""
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore
|
||||
|
||||
def check_value(self, value: float) -> float:
|
||||
"""Check if the value is larger equal to 0"""
|
||||
if value >= 0:
|
||||
return value
|
||||
else:
|
||||
raise ValueError(f"Width must be larger ot equal 0, got {value} seconds.")
|
||||
|
||||
def put(self, value: float, **kwargs):
|
||||
"""
|
||||
Set the width of the channel.
|
||||
|
||||
Args:
|
||||
value (float): The width to set in seconds.
|
||||
"""
|
||||
self.check_value(value)
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
ch1_setpoint: float = parent.ch1.setpoint.get() # type: ignore
|
||||
parent.ch2.setpoint.put(ch1_setpoint + value, **kwargs)
|
||||
|
||||
def set(self, value: float, **kwargs):
|
||||
"""
|
||||
Set the width of the channel.
|
||||
|
||||
Args:
|
||||
value (float): The width to set in seconds.
|
||||
"""
|
||||
status = StatusBase()
|
||||
self.put(value, **kwargs)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
class DelaySignal(Signal):
|
||||
"""A signal that represents the delay of a channel."""
|
||||
|
||||
def get(self, **kwargs):
|
||||
"""
|
||||
Get the delay of the channel.
|
||||
|
||||
Returns:
|
||||
float: The delay of the channel in seconds.
|
||||
"""
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
return parent.ch1.setpoint.get()
|
||||
|
||||
def put(self, value: float, **kwargs):
|
||||
"""
|
||||
Set the delay of the channel.
|
||||
|
||||
Args:
|
||||
value (float): The delay to set in seconds.
|
||||
"""
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
parent.ch1.setpoint.put(value, **kwargs)
|
||||
parent.ch2.setpoint.put(value + parent.width.get(), **kwargs)
|
||||
|
||||
def set(self, value: float, **kwargs):
|
||||
"""
|
||||
Set the width of the channel.
|
||||
|
||||
Args:
|
||||
value (float): The width to set in seconds.
|
||||
"""
|
||||
status = StatusBase()
|
||||
self.put(value, **kwargs)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
class _DelayPairBase(Device):
|
||||
"""Base class for delay pairs. Children have to implement ch1,ch2 for
|
||||
the respective delay channels. The class attributes have to be called
|
||||
ch1, ch2 for width and delay signals to work."""
|
||||
|
||||
ch1: Cpt[Channel]
|
||||
ch2: Cpt[Channel]
|
||||
io: Cpt[StaticPair]
|
||||
width = Cpt(
|
||||
WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse for delay pair"
|
||||
)
|
||||
delay = Cpt(
|
||||
DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse for delay pair"
|
||||
)
|
||||
|
||||
|
||||
class DelayPairAB(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted, doc="Channel A")
|
||||
ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted, doc="Channel B")
|
||||
io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted, doc="IO for delay pair AB")
|
||||
|
||||
|
||||
class DelayPairCD(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted, doc="Channel C")
|
||||
ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted, doc="Channel D")
|
||||
io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted, doc="IO for delay pair CD")
|
||||
|
||||
|
||||
class DelayPairEF(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted, doc="Channel E")
|
||||
ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted, doc="Channel F")
|
||||
io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted, doc="IO for delay pair EF")
|
||||
|
||||
|
||||
class DelayPairGH(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted, doc="Channel G")
|
||||
ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted, doc="Channel H")
|
||||
io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted, doc="IO for delay pair GH")
|
||||
|
||||
|
||||
class DelayGeneratorEventStatus(Device):
|
||||
"""Subdevice to represent the event state of the delay generator."""
|
||||
|
||||
event_status = Cpt(
|
||||
EpicsSignalRO,
|
||||
"EventStatusLI",
|
||||
name="event_status",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Event status register for the delay generator",
|
||||
)
|
||||
proc_status = Cpt(
|
||||
EpicsSignal,
|
||||
"EventStatusLI.PROC",
|
||||
name="proc_status",
|
||||
kind=Kind.omitted,
|
||||
doc="Poll and flush the latest event status register entry from the HW to the event_status signal",
|
||||
)
|
||||
|
||||
|
||||
class DelayGeneratorCSAXS(Device):
|
||||
"""
|
||||
Delay Generator Stanford Research DG645. This implements an interface for the DG645 delay generator.
|
||||
|
||||
Detailed information can be found in the manual:
|
||||
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
|
||||
|
||||
The DG645 has 8 channels, each with a delay and pulse width. The channels are implemented as DelayPair objects (AB etc.).
|
||||
|
||||
Each pair has a TTL pulse width, delay and a reference signal to which they are being triggered.
|
||||
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
|
||||
"""
|
||||
|
||||
_pv_timeout: float = 1.5 # Default timeout for PV operations in seconds
|
||||
|
||||
# Front Panel
|
||||
t0 = Cpt(StaticPair, "T0", name="t0", doc="T0 static pair")
|
||||
ab = Cpt(DelayPairAB, "", name="ab", doc="Delay pair AB")
|
||||
cd = Cpt(DelayPairCD, "", name="cd", doc="Delay pair CD")
|
||||
ef = Cpt(DelayPairEF, "", name="ef", doc="Delay pair EF")
|
||||
gh = Cpt(DelayPairGH, "", name="gh", doc="Delay pair GH")
|
||||
state = Cpt(DelayGeneratorEventStatus, "", name="state", doc="Subdevice for event status")
|
||||
status_msg = Cpt(
|
||||
EpicsSignalRO,
|
||||
"StatusSI",
|
||||
name="status_msg",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Status message from the delay generator",
|
||||
)
|
||||
status_msg_clear = Cpt(
|
||||
EpicsSignal,
|
||||
"StatusClearBO",
|
||||
name="status_msg_clear",
|
||||
kind=Kind.omitted,
|
||||
doc="Clear the status message",
|
||||
)
|
||||
|
||||
trigger_holdoff = Cpt(
|
||||
EpicsSignal,
|
||||
"TriggerHoldoffAI",
|
||||
write_pv="TriggerHoldoffAO",
|
||||
name="trigger_holdoff",
|
||||
kind=Kind.config,
|
||||
)
|
||||
trigger_inhibit = Cpt(
|
||||
EpicsSignal,
|
||||
"TriggerInhibitMI",
|
||||
write_pv="TriggerInhibitMO",
|
||||
name="trigger_inhibit",
|
||||
kind=Kind.omitted,
|
||||
)
|
||||
trigger_source = Cpt(
|
||||
EpicsSignal,
|
||||
"TriggerSourceMI",
|
||||
write_pv="TriggerSourceMO",
|
||||
name="trigger_source",
|
||||
kind=Kind.omitted,
|
||||
doc="Trigger Source for the DDG, options in TRIGGERSOURCE",
|
||||
)
|
||||
trigger_level = Cpt(
|
||||
EpicsSignal,
|
||||
"TriggerLevelAI",
|
||||
write_pv="TriggerLevelAO",
|
||||
name="trigger_level",
|
||||
kind=Kind.omitted,
|
||||
)
|
||||
trigger_rate = Cpt(
|
||||
EpicsSignal,
|
||||
"TriggerRateAI",
|
||||
write_pv="TriggerRateAO",
|
||||
name="trigger_rate",
|
||||
kind=Kind.omitted,
|
||||
)
|
||||
trigger_shot = Cpt(
|
||||
EpicsSignal,
|
||||
"TriggerDelayBO",
|
||||
name="trigger_shot",
|
||||
kind=Kind.omitted,
|
||||
doc="Software trigger, needs to be in correct mode to work",
|
||||
)
|
||||
burst_mode = Cpt(
|
||||
EpicsSignal,
|
||||
"BurstModeBI",
|
||||
write_pv="BurstModeBO",
|
||||
name="burst_mode",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Enable or disable burst mode. 1 = enabled, 0 = disabled.",
|
||||
)
|
||||
burst_config = Cpt(
|
||||
EpicsSignal,
|
||||
"BurstConfigBI",
|
||||
write_pv="BurstConfigBO",
|
||||
name="burst_config",
|
||||
kind=Kind.omitted,
|
||||
doc="Configuration of T0 during burst. Can be ALL_CYCLES (0) or FIRST_CYCLE (1) .",
|
||||
)
|
||||
burst_count = Cpt(
|
||||
EpicsSignal,
|
||||
"BurstCountLI",
|
||||
write_pv="BurstCountLO",
|
||||
name="burst_count",
|
||||
kind=Kind.omitted,
|
||||
doc="Number of bursts to trigger in burst mode. Must be >0.",
|
||||
)
|
||||
burst_delay = Cpt(
|
||||
EpicsSignal,
|
||||
"BurstDelayAI",
|
||||
write_pv="BurstDelayAO",
|
||||
name="burst_delay",
|
||||
kind=Kind.omitted,
|
||||
doc="Delay before bursts start in seconds. Must be >=0.",
|
||||
)
|
||||
burst_period = Cpt(
|
||||
EpicsSignal,
|
||||
"BurstPeriodAI",
|
||||
write_pv="BurstPeriodAO",
|
||||
name="burst_period",
|
||||
kind=Kind.omitted,
|
||||
doc="Period of the bursts in seconds. Must be >0.",
|
||||
)
|
||||
|
||||
def proc_event_status(self) -> None:
|
||||
"""The reading must be manually triggered to update the event status."""
|
||||
self.state.proc_status.put(1)
|
||||
|
||||
def wait_for_event_status(
|
||||
self, value: STATUSBITS, timeout: float | None = None
|
||||
) -> StatusBitsCompareStatus:
|
||||
"""
|
||||
Wait for a specific event status.
|
||||
|
||||
Args:
|
||||
value (STATUSBITS): The status bits to wait for.
|
||||
timeout (float): The maximum time to wait in seconds.
|
||||
"""
|
||||
return StatusBitsCompareStatus(
|
||||
signal=self.state.event_status, value=value, timeout=timeout, run=True
|
||||
)
|
||||
|
||||
def set_trigger(self, source: TRIGGERSOURCE | int) -> None:
|
||||
"""
|
||||
Set the trigger source.
|
||||
|
||||
Args:
|
||||
source (TriggerSource | int): The trigger source
|
||||
INTERNAL = 0
|
||||
EXT_RISING_EDGE = 1
|
||||
EXT_FALLING_EDGE = 2
|
||||
SS_EXT_RISING_EDGE = 3
|
||||
SS_EXT_FALLING_EDGE = 4
|
||||
SINGLE_SHOT = 5
|
||||
LINE = 6
|
||||
"""
|
||||
if isinstance(source, TRIGGERSOURCE):
|
||||
self.trigger_source.set(source.value).wait(self._pv_timeout)
|
||||
else:
|
||||
self.trigger_source.set(int(source)).wait(self._pv_timeout)
|
||||
|
||||
@typechecked
|
||||
def burst_enable(
|
||||
self,
|
||||
count: int,
|
||||
delay: float,
|
||||
period: float,
|
||||
config: Literal["all", "first"] | BURSTCONFIG = "first",
|
||||
) -> None:
|
||||
"""Enable burst mode with valid parameters.
|
||||
|
||||
Args:
|
||||
count (int): Number of bursts >0
|
||||
delay (float): Delay before bursts start in seconds >=0
|
||||
period (float): Period of the bursts in seconds >0
|
||||
config (str): Configuration of T0 duiring burst.
|
||||
In addition, to simplify triggering of other instruments synchronously with the burst,
|
||||
the T0 output may be configured to fire on the first delay cycle of the burst,
|
||||
rather than for all delay cycles as is normally the case. BURSTCONFIG
|
||||
"""
|
||||
|
||||
# Check inputs first
|
||||
if count <= 0:
|
||||
raise ValueError(f"Count must be >0, provided: {count}")
|
||||
if delay < 0:
|
||||
raise ValueError(f"Delay must be >=0, provided: {delay}")
|
||||
if period <= 0:
|
||||
raise ValueError(f"Period must be >0, provided: {period}")
|
||||
|
||||
self.burst_mode.set(1).wait(timeout=self._pv_timeout)
|
||||
self.burst_count.set(count).wait(timeout=self._pv_timeout)
|
||||
self.burst_delay.set(delay).wait(timeout=self._pv_timeout)
|
||||
self.burst_period.set(period).wait(timeout=self._pv_timeout)
|
||||
|
||||
if config == "all":
|
||||
self.burst_config.set(BURSTCONFIG.ALL_CYCLES.value).wait(timeout=self._pv_timeout)
|
||||
elif config == "first":
|
||||
self.burst_config.set(BURSTCONFIG.FIRST_CYCLE.value).wait(timeout=self._pv_timeout)
|
||||
|
||||
def burst_disable(self) -> None:
|
||||
"""Disable burst mode"""
|
||||
self.burst_mode.set(0).wait(timeout=self._pv_timeout)
|
||||
|
||||
@typechecked
|
||||
def set_io_values(
|
||||
self,
|
||||
channel: AllChannelNames | list[AllChannelNames],
|
||||
amplitude: float | None = None,
|
||||
offset: float | None = None,
|
||||
polarity: OUTPUTPOLARITY | Literal[0, 1] | None = None,
|
||||
mode: Literal["ttl", "nim"] | None = None,
|
||||
) -> None:
|
||||
"""Set the IO values for the static pair.
|
||||
|
||||
Args:
|
||||
channel (str | list[str]): Channel(s) to set the IO values for.
|
||||
Can be "t0", "ab", "cd", "ef", "gh" or a list of these.
|
||||
If a list is provided, the same values will be set for all channels.
|
||||
amplitude (float): Amplitude of the output signal in volts.
|
||||
offset (float): Offset of the output signal in volts.
|
||||
polarity (OUTPUTPOLARITY | int): Polarity of the output signal.
|
||||
ttl_mode (bool): If True, set the output to TTL mode.
|
||||
nim_mode (bool): If True, set the output to NIM mode.
|
||||
If both ttl_mode and nim_mode are set to True,
|
||||
a ValueError is raised.
|
||||
"""
|
||||
if isinstance(channel, str):
|
||||
channel = [channel]
|
||||
for ch in channel:
|
||||
if ch == "t0":
|
||||
io_channel = self.t0
|
||||
else:
|
||||
io_channel = getattr(getattr(self, ch), "io")
|
||||
if amplitude is not None:
|
||||
io_channel.amplitude.set(amplitude).wait(timeout=self._pv_timeout)
|
||||
if offset is not None:
|
||||
io_channel.offset.set(offset).wait(timeout=self._pv_timeout)
|
||||
if polarity is not None:
|
||||
if isinstance(polarity, OUTPUTPOLARITY):
|
||||
io_channel.polarity.set(polarity.value).wait(timeout=self._pv_timeout)
|
||||
else:
|
||||
io_channel.polarity.set(int(polarity)).wait(timeout=self._pv_timeout)
|
||||
if mode == "ttl":
|
||||
io_channel.ttl_mode.set(1).wait(timeout=self._pv_timeout)
|
||||
if mode == "nim":
|
||||
io_channel.nim_mode.set(1).wait(timeout=self._pv_timeout)
|
||||
|
||||
def set_delay_pairs(
|
||||
self,
|
||||
channel: DelayChannelNames | list[DelayChannelNames],
|
||||
delay: float | list[float] | None = None,
|
||||
width: float | list[float] | None = None,
|
||||
) -> None:
|
||||
"""Set the delay and width for a specific channel pair.
|
||||
|
||||
Args:
|
||||
channel (str): Channel pair to set the delay and width for.
|
||||
Can be "ab", "cd", "ef", "gh".
|
||||
delay (float): Delay in seconds to set for the channel pair.
|
||||
width (float): Width in seconds to set for the channel pair.
|
||||
"""
|
||||
if isinstance(channel, str):
|
||||
channel = [channel]
|
||||
if isinstance(delay, (float, int)):
|
||||
delay = [float(delay)] * len(channel)
|
||||
if isinstance(width, (float, int)):
|
||||
width = [float(width)] * len(channel)
|
||||
if delay is not None:
|
||||
if len(delay) != len(channel):
|
||||
raise ValueError(
|
||||
f"Length of delay {len(delay)} must match length of channel {len(channel)}."
|
||||
)
|
||||
for ii, ch in enumerate(channel):
|
||||
delay_channel = getattr(self, ch)
|
||||
delay_channel.delay.put(delay[ii])
|
||||
if width is not None:
|
||||
if len(width) != len(channel):
|
||||
raise ValueError(
|
||||
f"Length of width {len(width)} must match length of channel {len(channel)}."
|
||||
)
|
||||
for ii, ch in enumerate(channel):
|
||||
delay_channel = getattr(self, ch)
|
||||
delay_channel.width.put(width[ii])
|
||||
|
||||
def _get_literal_channel(self, channel: LiteralChannels) -> Channel:
|
||||
return {
|
||||
"A": self.ab.ch1,
|
||||
"B": self.ab.ch2,
|
||||
"C": self.cd.ch1,
|
||||
"D": self.cd.ch2,
|
||||
"E": self.ef.ch1,
|
||||
"F": self.ef.ch2,
|
||||
"G": self.gh.ch1,
|
||||
"H": self.gh.ch2,
|
||||
}[channel]
|
||||
|
||||
def set_channel_reference(self, channel: LiteralChannels, reference_channel: CHANNELREFERENCE):
|
||||
self._get_literal_channel(channel).reference.put(reference_channel.value)
|
||||
|
||||
def set_references_for_channels(
|
||||
self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]
|
||||
):
|
||||
for ch, ref in channels_and_refs:
|
||||
self.set_channel_reference(ch, ref)
|
||||
|
||||
def stop_ddg(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
self.burst_mode.put(0)
|
||||
|
||||
def reset_error(self) -> None:
|
||||
"""Reset the error status message of the delay generator."""
|
||||
self.status_msg_clear.put(1)
|
||||
|
||||
def get_error_msg(self) -> str:
|
||||
"""Get the error message from the delay generator."""
|
||||
msg = self.status_msg.get()
|
||||
if msg in ERROR_CODES:
|
||||
return ERROR_CODES[msg]
|
||||
else:
|
||||
return f"Unknown error code: {msg}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ddg = DelayGeneratorCSAXS(name="ddg", prefix="X12SA-CPCL-DDG1:")
|
||||
ddg.wait_for_connection(all_signals=True, timeout=30)
|
||||
ddg.summary()
|
||||
@@ -0,0 +1,73 @@
|
||||
ERROR_CODES: dict[str, str] = {
|
||||
"STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for 'No Error' to 'STATUS OK'
|
||||
"Illegal Value": "A parameter was out of range.",
|
||||
"Illegal Mode": "The action is illegal in the current mode.",
|
||||
"Illegal Delay": "The requested delay is out of range.",
|
||||
"Illegal Link": "The requested delay linkage is illegal.",
|
||||
"Recall Failed": "Recall of instrument settings failed; settings were invalid.",
|
||||
"Not Allowed": "Action not allowed: instrument is locked by another interface.",
|
||||
"Failed Self Test": "The DG645 self test failed.",
|
||||
"Failed Auto Calibration": "The DG645 auto calibration failed.",
|
||||
"Lost Data": "Output buffer overflow or data lost due to communication error.",
|
||||
"No Listener": "No GPIB listeners; pending output discarded.",
|
||||
"Failed ROM Check": "ROM checksum failed; firmware likely corrupted.",
|
||||
"Failed Offset T0 Test": "Self test of offset functionality for T0 failed.",
|
||||
"Failed Offset AB Test": "Self test of offset functionality for AB failed.",
|
||||
"Failed Offset CD Test": "Self test of offset functionality for CD failed.",
|
||||
"Failed Offset EF Test": "Self test of offset functionality for EF failed.",
|
||||
"Failed Offset GH Test": "Self test of offset functionality for GH failed.",
|
||||
"Failed Amplitude T0 Test": "Self test of amplitude functionality for T0 failed.",
|
||||
"Failed Amplitude AB Test": "Self test of amplitude functionality for AB failed.",
|
||||
"Failed Amplitude CD Test": "Self test of amplitude functionality for CD failed.",
|
||||
"Failed Amplitude EF Test": "Self test of amplitude functionality for EF failed.",
|
||||
"Failed Amplitude GH Test": "Self test of amplitude functionality for GH failed.",
|
||||
"Failed FPGA Communications Test": "Self test of FPGA communications failed.",
|
||||
"Failed GPIB Communications Test": "Self test of GPIB communications failed.",
|
||||
"Failed DDS Communications Test": "Self test of DDS communications failed.",
|
||||
"Failed Serial EEPROM Communications Test": "Self test of serial EEPROM failed.",
|
||||
"Failed Temperature Sensor Communications Test": "Temp sensor communication failed.",
|
||||
"Failed PLL Communications Test": "PLL communication self test failed.",
|
||||
"Failed DAC 0 Communications Test": "Self test of DAC 0 failed.",
|
||||
"Failed DAC 1 Communications Test": "Self test of DAC 1 failed.",
|
||||
"Failed DAC 2 Communications Test": "Self test of DAC 2 failed.",
|
||||
"Failed Sample and Hold Operations Test": "Sample and hold self test failed.",
|
||||
"Failed Vjitter Operations Test": "Vjitter operation self test failed.",
|
||||
"Failed Channel T0 Analog Delay Test": "Analog delay test for T0 failed.",
|
||||
"Failed Channel T1 Analog Delay Test": "Analog delay test for T1 failed.",
|
||||
"Failed Channel A Analog Delay Test": "Analog delay test for A failed.",
|
||||
"Failed Channel B Analog Delay Test": "Analog delay test for B failed.",
|
||||
"Failed Channel C Analog Delay Test": "Analog delay test for C failed.",
|
||||
"Failed Channel D Analog Delay Test": "Analog delay test for D failed.",
|
||||
"Failed Channel E Analog Delay Test": "Analog delay test for E failed.",
|
||||
"Failed Channel F Analog Delay Test": "Analog delay test for F failed.",
|
||||
"Failed Channel G Analog Delay Test": "Analog delay test for G failed.",
|
||||
"Failed Channel H Analog Delay Test": "Analog delay test for H failed.",
|
||||
"Failed Sample and Hold Calibration": "Auto calibration of sample and hold failed.",
|
||||
"Failed T0 Calibration": "Auto calibration of channel T0 failed.",
|
||||
"Failed T1 Calibration": "Auto calibration of channel T1 failed.",
|
||||
"Failed A Calibration": "Auto calibration of channel A failed.",
|
||||
"Failed B Calibration": "Auto calibration of channel B failed.",
|
||||
"Failed C Calibration": "Auto calibration of channel C failed.",
|
||||
"Failed D Calibration": "Auto calibration of channel D failed.",
|
||||
"Failed E Calibration": "Auto calibration of channel E failed.",
|
||||
"Failed F Calibration": "Auto calibration of channel F failed.",
|
||||
"Failed G Calibration": "Auto calibration of channel G failed.",
|
||||
"Failed H Calibration": "Auto calibration of channel H failed.",
|
||||
"Failed Vjitter Calibration": "Auto calibration of Vjitter failed.",
|
||||
"Illegal Command": "The command syntax used was illegal.",
|
||||
"Undefined Command": "The specified command does not exist.",
|
||||
"Illegal Query": "The specified command does not permit queries.",
|
||||
"Illegal Set": "The specified command can only be queried.",
|
||||
"Null Parameter": "The parser detected an empty parameter.",
|
||||
"Extra Parameters": "Too many parameters were provided.",
|
||||
"Missing Parameters": "Some required parameters are missing.",
|
||||
"Parameter Overflow": "Buffer overflow while parsing parameters.",
|
||||
"Invalid Floating Point Number": "Expected a float but couldn't parse it.",
|
||||
"Invalid Integer": "Expected an integer but couldn't parse it.",
|
||||
"Integer Overflow": "Parsed integer is too large.",
|
||||
"Invalid Hexadecimal": "Failed to parse expected hexadecimal input.",
|
||||
"Syntax Error": "The parser detected a syntax error.",
|
||||
"Communication Error": "Framing or parity error detected.",
|
||||
"Over run": "Input buffer overflowed.",
|
||||
"Too Many Errors": "Error buffer is full; some errors dropped.",
|
||||
}
|
||||
Binary file not shown.
@@ -1,276 +1,289 @@
|
||||
# pylint: skip-file
|
||||
import threading
|
||||
from typing import Generator
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import ophyd
|
||||
import pytest
|
||||
from ophyd_devices.devices.delay_generator_645 import TriggerSource
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
||||
BURSTCONFIG,
|
||||
CHANNELREFERENCE,
|
||||
STATUSBITS,
|
||||
TRIGGERSOURCE,
|
||||
DelayGeneratorCSAXS,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_DDGSetup():
|
||||
mock_ddg = mock.MagicMock()
|
||||
yield DDGSetup(parent=mock_ddg)
|
||||
def mock_ddg1() -> Generator[DDG1, DDG1, DDG1]:
|
||||
"""Fixture to mock the DDG1 device."""
|
||||
name = "ddg1"
|
||||
prefix = "test_ddg1:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = DDG1(name=name, prefix=prefix)
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
# Fixture for scaninfo
|
||||
@pytest.fixture(
|
||||
params=[
|
||||
{
|
||||
"scan_id": "1234",
|
||||
"scan_type": "step",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"exp_time": 0.1,
|
||||
"readout_time": 0.1,
|
||||
},
|
||||
{
|
||||
"scan_id": "1234",
|
||||
"scan_type": "step",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 5,
|
||||
"exp_time": 0.01,
|
||||
"readout_time": 0,
|
||||
},
|
||||
{
|
||||
"scan_id": "1234",
|
||||
"scan_type": "fly",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"exp_time": 1,
|
||||
"readout_time": 0.2,
|
||||
},
|
||||
{
|
||||
"scan_id": "1234",
|
||||
"scan_type": "fly",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 5,
|
||||
"exp_time": 0.1,
|
||||
"readout_time": 0.4,
|
||||
},
|
||||
]
|
||||
)
|
||||
def scaninfo(request):
|
||||
return request.param
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_ddg2() -> Generator[DDG2, DDG2, DDG2]:
|
||||
"""Fixture to mock the DDG1 device."""
|
||||
name = "ddg2"
|
||||
prefix = "test_ddg2:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = DDG2(name=name, prefix=prefix)
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
# Fixture for DDG config default values
|
||||
@pytest.fixture(
|
||||
params=[
|
||||
{
|
||||
"delay_burst": 0.0,
|
||||
"delta_width": 0.0,
|
||||
"additional_triggers": 0,
|
||||
"polarity": [0, 0, 0, 0, 0],
|
||||
"amplitude": 0.0,
|
||||
"offset": 0.0,
|
||||
"thres_trig_level": 0.0,
|
||||
},
|
||||
{
|
||||
"delay_burst": 0.1,
|
||||
"delta_width": 0.1,
|
||||
"additional_triggers": 1,
|
||||
"polarity": [0, 0, 1, 0, 0],
|
||||
"amplitude": 5,
|
||||
"offset": 0.0,
|
||||
"thres_trig_level": 2.5,
|
||||
},
|
||||
]
|
||||
)
|
||||
def ddg_config_defaults(request):
|
||||
return request.param
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]:
|
||||
"""Fixture to mock the camera device."""
|
||||
name = "ddg"
|
||||
prefix = "test:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = DelayGeneratorCSAXS(name=name, prefix=prefix)
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
# Fixture for DDG config scan values
|
||||
@pytest.fixture(
|
||||
params=[
|
||||
{
|
||||
"fixed_ttl_width": [0, 0, 0, 0, 0],
|
||||
"trigger_width": None,
|
||||
"set_high_on_exposure": False,
|
||||
"set_high_on_stage": False,
|
||||
"set_trigger_source": "SINGLE_SHOT",
|
||||
"premove_trigger": False,
|
||||
},
|
||||
{
|
||||
"fixed_ttl_width": [0, 0, 0, 0, 0],
|
||||
"trigger_width": 0.1,
|
||||
"set_high_on_exposure": True,
|
||||
"set_high_on_stage": False,
|
||||
"set_trigger_source": "SINGLE_SHOT",
|
||||
"premove_trigger": True,
|
||||
},
|
||||
{
|
||||
"fixed_ttl_width": [0, 0, 0, 0, 0],
|
||||
"trigger_width": 0.1,
|
||||
"set_high_on_exposure": False,
|
||||
"set_high_on_stage": False,
|
||||
"set_trigger_source": "EXT_RISING_EDGE",
|
||||
"premove_trigger": False,
|
||||
},
|
||||
]
|
||||
)
|
||||
def ddg_config_scan(request):
|
||||
return request.param
|
||||
def test_ddg_init(mock_ddg):
|
||||
"""Test the proc event status method."""
|
||||
assert mock_ddg.name == "ddg"
|
||||
assert mock_ddg.prefix == "test:"
|
||||
|
||||
|
||||
# Fixture for delay pairs
|
||||
@pytest.fixture(
|
||||
params=[
|
||||
{"all_channels": ["channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]},
|
||||
{"all_channels": [], "all_delay_pairs": []},
|
||||
{"all_channels": ["channelT0", "channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]},
|
||||
]
|
||||
)
|
||||
def channel_pairs(request):
|
||||
return request.param
|
||||
def test_ddg_proc_event_status(mock_ddg):
|
||||
"""Test the proc event status method."""
|
||||
mock_ddg.state.proc_status.put(0)
|
||||
mock_ddg.proc_event_status()
|
||||
assert mock_ddg.state.proc_status.get() == 1
|
||||
|
||||
|
||||
def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
|
||||
"""Test the check_scan_id method."""
|
||||
# Set first attributes of parent class
|
||||
for k, v in scaninfo.items():
|
||||
setattr(mock_DDGSetup.parent.scaninfo, k, v)
|
||||
for k, v in ddg_config_defaults.items():
|
||||
getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
for k, v in ddg_config_scan.items():
|
||||
getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
# Call the function you want to test
|
||||
mock_DDGSetup.on_pre_scan()
|
||||
if ddg_config_scan["premove_trigger"]:
|
||||
mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
|
||||
def test_ddg_set_trigger(mock_ddg):
|
||||
"""Test setting the trigger."""
|
||||
for trigger in TRIGGERSOURCE:
|
||||
mock_ddg.set_trigger(trigger)
|
||||
assert mock_ddg.trigger_source.get() == trigger.value
|
||||
|
||||
|
||||
# TODO put back once the logic is implemented
|
||||
# @pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"])
|
||||
# def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source):
|
||||
# """Test the on_trigger method."""
|
||||
# # Set first attributes of parent class
|
||||
# for k, v in scaninfo.items():
|
||||
# setattr(mock_DDGSetup.parent.scaninfo, k, v)
|
||||
# for k, v in ddg_config_defaults.items():
|
||||
# getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
# for k, v in ddg_config_scan.items():
|
||||
# getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
# # Call the function you want to test
|
||||
# mock_DDGSetup.parent.source.name = "source"
|
||||
# mock_DDGSetup.parent.source.read.return_value = {
|
||||
# mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)}
|
||||
# }
|
||||
# mock_DDGSetup.on_trigger()
|
||||
# if source == "SINGLE_SHOT":
|
||||
# mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
|
||||
def test_ddg_burst_enable(mock_ddg):
|
||||
"""Test enabling burst mode."""
|
||||
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
|
||||
mock_ddg.burst_mode.get() == 1
|
||||
assert mock_ddg.burst_count.get() == 100
|
||||
assert mock_ddg.burst_delay.get() == 0.1
|
||||
assert mock_ddg.burst_period.get() == 0.02
|
||||
assert mock_ddg.burst_config.get() == BURSTCONFIG.ALL_CYCLES.value
|
||||
assert mock_ddg.burst_mode.get() == 1
|
||||
# Count is 0
|
||||
with pytest.raises(ValueError):
|
||||
mock_ddg.burst_enable(count=0, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
|
||||
# delay is negative
|
||||
with pytest.raises(ValueError):
|
||||
mock_ddg.burst_enable(count=100, delay=-0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
|
||||
# period is zero
|
||||
with pytest.raises(ValueError):
|
||||
mock_ddg.burst_enable(count=100, delay=0.1, period=0, config=BURSTCONFIG.ALL_CYCLES)
|
||||
|
||||
# Works with default config
|
||||
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02)
|
||||
mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value
|
||||
|
||||
|
||||
def test_on_wait_for_connection(
|
||||
mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs
|
||||
):
|
||||
"""Test the initialize_default_parameter method."""
|
||||
# Set first attributes of parent class
|
||||
for k, v in scaninfo.items():
|
||||
setattr(mock_DDGSetup.parent.scaninfo, k, v)
|
||||
for k, v in ddg_config_defaults.items():
|
||||
getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
for k, v in ddg_config_scan.items():
|
||||
getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
# Call the function you want to test
|
||||
mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
|
||||
mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
|
||||
calls = []
|
||||
calls.extend(
|
||||
[
|
||||
mock.call("polarity", ddg_config_defaults["polarity"][ii], [channel])
|
||||
for ii, channel in enumerate(channel_pairs["all_channels"])
|
||||
]
|
||||
def test_ddg_wait_for_event_status(mock_ddg):
|
||||
"""Test setting wait for event status."""
|
||||
mock_ddg: DelayGeneratorCSAXS
|
||||
mock_ddg.state.event_status._read_pv.mock_data = 0
|
||||
status = mock_ddg.wait_for_event_status(value=STATUSBITS.END_OF_BURST) # 8
|
||||
assert status.done is False
|
||||
mock_ddg.state.event_status._read_pv.mock_data = 1
|
||||
assert status.done is False
|
||||
mock_ddg.state.event_status._read_pv.mock_data = 4
|
||||
assert status.done is False
|
||||
# TODO enable once callback for MockPV is implemented
|
||||
# mock_ddg.state.event_status._read_pv.mock_data = 13 # 8 + 4 + 1
|
||||
# status.wait(timeout=1) # Wait for the status to be done
|
||||
# assert status.done is True
|
||||
|
||||
|
||||
def test_ddg_set_io_values(mock_ddg):
|
||||
"""Test setting IO values."""
|
||||
mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl")
|
||||
assert mock_ddg.ab.io.amplitude.get() == 3
|
||||
assert mock_ddg.ab.io.offset.get() == 2
|
||||
assert mock_ddg.ab.io.polarity.get() == 1
|
||||
assert mock_ddg.ab.io.ttl_mode.get() == 1
|
||||
# List of channels
|
||||
channels = ["ab", "cd", "t0"]
|
||||
mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim")
|
||||
for channel in channels:
|
||||
if channel == "t0":
|
||||
attr = getattr(mock_ddg, channel)
|
||||
else:
|
||||
attr = getattr(mock_ddg, channel).io
|
||||
assert attr.amplitude.get() == 3
|
||||
assert attr.offset.get() == 2
|
||||
assert attr.polarity.get() == 1
|
||||
assert attr.nim_mode.get() == 1
|
||||
|
||||
|
||||
def test_ddg_set_delay_pairs(mock_ddg):
|
||||
"""Test setting delay pairs."""
|
||||
mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2)
|
||||
assert np.isclose(mock_ddg.ab.delay.get(), 0.1)
|
||||
assert np.isclose(mock_ddg.ab.width.get(), 0.2)
|
||||
assert np.isclose(mock_ddg.ab.ch1.setpoint.get(), 0.1)
|
||||
assert np.isclose(mock_ddg.ab.ch2.setpoint.get(), 0.3)
|
||||
# List of channels
|
||||
channels = ["ab", "cd", "ef", "gh"]
|
||||
delays = [0.1, 0.2, 0.4, 0.5]
|
||||
mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2)
|
||||
for delay, channel in zip(delays, channels):
|
||||
assert np.isclose(getattr(mock_ddg, channel).delay.get(), delay)
|
||||
assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2)
|
||||
assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay)
|
||||
assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2)
|
||||
|
||||
|
||||
def test_ddg1_on_connected(mock_ddg1):
|
||||
"""Test the on_connected method of DDG1."""
|
||||
mock_ddg1.on_connected()
|
||||
# IO defaults
|
||||
assert mock_ddg1.burst_mode.get() == 0
|
||||
assert mock_ddg1.ab.io.amplitude.get() == 5.0
|
||||
assert mock_ddg1.cd.io.offset.get() == 0.0
|
||||
assert mock_ddg1.ef.io.polarity.get() == 1
|
||||
assert mock_ddg1.gh.io.ttl_mode.get() == 1
|
||||
|
||||
# reference defaults
|
||||
assert mock_ddg1.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg1.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
|
||||
assert mock_ddg1.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg1.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
|
||||
assert mock_ddg1.ef.ch1.reference.get() == 4 # CHANNELREFERENCE.D.value
|
||||
assert mock_ddg1.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
|
||||
assert mock_ddg1.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg1.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
|
||||
|
||||
# Default trigger source
|
||||
assert mock_ddg1.trigger_source.get() == 5 # TRIGGERSOURCE.SINGLE_SHOT.value
|
||||
|
||||
|
||||
def test_ddg1_stage(mock_ddg1):
|
||||
"""Test the on_stage method of DDG1."""
|
||||
exp_time = 0.1
|
||||
frames_per_trigger = 10
|
||||
|
||||
mock_ddg1.burst_mode.put(1)
|
||||
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
|
||||
mock_ddg1.stage()
|
||||
|
||||
assert np.isclose(mock_ddg1.burst_mode.get(), 0) # Burst mode is disabled
|
||||
# Trigger DDG2 through EXT/EN
|
||||
|
||||
assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3)
|
||||
assert np.isclose(mock_ddg1.ab.width.get(), 1e-6)
|
||||
# Shutter channel cd
|
||||
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg1.cd.width.get(), 2e-3 + exp_time * frames_per_trigger + 1e-3)
|
||||
# MCS channel ef or gate
|
||||
assert np.isclose(mock_ddg1.ef.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg1.ef.width.get(), 1e-6)
|
||||
|
||||
assert mock_ddg1.staged == ophyd.Staged.yes
|
||||
|
||||
|
||||
def test_ddg1_trigger(mock_ddg1):
|
||||
"""Test the on_trigger method of DDG1."""
|
||||
mock_ddg1.state.event_status._read_pv.mock_data = (
|
||||
5 # STATUSBITS.END_OF_DELAY.value + STATUSBITS.TRIG.value
|
||||
)
|
||||
calls.extend([mock.call("amplitude", ddg_config_defaults["amplitude"])])
|
||||
calls.extend([mock.call("offset", ddg_config_defaults["offset"])])
|
||||
calls.extend(
|
||||
[
|
||||
mock.call(
|
||||
"reference", 0, [f"channel{pair}.ch1" for pair in channel_pairs["all_delay_pairs"]]
|
||||
)
|
||||
]
|
||||
)
|
||||
calls.extend(
|
||||
[
|
||||
mock.call(
|
||||
"reference", 0, [f"channel{pair}.ch2" for pair in channel_pairs["all_delay_pairs"]]
|
||||
)
|
||||
]
|
||||
)
|
||||
mock_DDGSetup.on_wait_for_connection()
|
||||
mock_DDGSetup.parent.set_channels.assert_has_calls(calls)
|
||||
status = mock_ddg1.trigger()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
assert mock_ddg1.trigger_shot.get() == 1
|
||||
|
||||
|
||||
# TODO put back once the logic is implemented
|
||||
# def test_on_stage(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs):
|
||||
# """Test the prepare_ddg method."""
|
||||
# # Set first attributes of parent class
|
||||
# for k, v in scaninfo.items():
|
||||
# setattr(mock_DDGSetup.parent.scaninfo, k, v)
|
||||
# for k, v in ddg_config_defaults.items():
|
||||
# getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
# for k, v in ddg_config_scan.items():
|
||||
# getattr(mock_DDGSetup.parent, k).get.return_value = v
|
||||
# # Call the function you want to test
|
||||
# mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
|
||||
# mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
|
||||
def test_ddg1_stop(mock_ddg1):
|
||||
"""Test the on_stop method of DDG1."""
|
||||
mock_ddg1.burst_mode.put(1) # Enable burst mode
|
||||
mock_ddg1.stop()
|
||||
assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled
|
||||
|
||||
# mock_DDGSetup.prepare_ddg()
|
||||
# mock_DDGSetup.parent.set_trigger.assert_called_once_with(
|
||||
# getattr(TriggerSource, ddg_config_scan["set_trigger_source"])
|
||||
# )
|
||||
# if scaninfo["scan_type"] == "step":
|
||||
# if ddg_config_scan["set_high_on_exposure"]:
|
||||
# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
|
||||
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * (
|
||||
# scaninfo["exp_time"] + scaninfo["readout_time"]
|
||||
# )
|
||||
# total_exposure = exp_time
|
||||
# delay_burst = ddg_config_defaults["delay_burst"]
|
||||
# else:
|
||||
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
|
||||
# total_exposure = exp_time + scaninfo["readout_time"]
|
||||
# delay_burst = ddg_config_defaults["delay_burst"]
|
||||
# num_burst_cycle = (
|
||||
# scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"]
|
||||
# )
|
||||
# elif scaninfo["scan_type"] == "fly":
|
||||
# if ddg_config_scan["set_high_on_exposure"]:
|
||||
# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
|
||||
# exp_time = (
|
||||
# ddg_config_defaults["delta_width"]
|
||||
# + scaninfo["num_points"] * scaninfo["exp_time"]
|
||||
# + (scaninfo["num_points"] - 1) * scaninfo["readout_time"]
|
||||
# )
|
||||
# total_exposure = exp_time
|
||||
# delay_burst = ddg_config_defaults["delay_burst"]
|
||||
# else:
|
||||
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
|
||||
# total_exposure = exp_time + scaninfo["readout_time"]
|
||||
# delay_burst = ddg_config_defaults["delay_burst"]
|
||||
# num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"]
|
||||
|
||||
# # mock_DDGSetup.parent.burst_enable.assert_called_once_with(
|
||||
# # mock.call(num_burst_cycle, delay_burst, total_exposure, config="first")
|
||||
# # )
|
||||
# mock_DDGSetup.parent.burst_enable.assert_called_once_with(
|
||||
# num_burst_cycle, delay_burst, total_exposure, config="first"
|
||||
# )
|
||||
# if not ddg_config_scan["trigger_width"]:
|
||||
# call = mock.call("width", exp_time)
|
||||
# assert call in mock_DDGSetup.parent.set_channels.mock_calls
|
||||
# else:
|
||||
# call = mock.call("width", ddg_config_scan["trigger_width"])
|
||||
# assert call in mock_DDGSetup.parent.set_channels.mock_calls
|
||||
# if ddg_config_scan["set_high_on_exposure"]:
|
||||
# calls = [
|
||||
# mock.call("width", value, channels=[channel])
|
||||
# for value, channel in zip(
|
||||
# ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"]
|
||||
# )
|
||||
# if value != 0
|
||||
# ]
|
||||
# if calls:
|
||||
# assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls)
|
||||
def test_ddg2_on_connected(mock_ddg2):
|
||||
"""Test on connected method of DDG2."""
|
||||
mock_ddg2.on_connected()
|
||||
# IO defaults
|
||||
assert mock_ddg2.burst_mode.get() == 0
|
||||
assert mock_ddg2.ab.io.amplitude.get() == 5.0
|
||||
assert mock_ddg2.cd.io.offset.get() == 0.0
|
||||
assert mock_ddg2.ef.io.polarity.get() == 1
|
||||
assert mock_ddg2.gh.io.ttl_mode.get() == 1
|
||||
|
||||
# reference defaults
|
||||
assert mock_ddg2.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
|
||||
assert mock_ddg2.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
|
||||
assert mock_ddg2.ef.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
|
||||
assert mock_ddg2.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
|
||||
|
||||
# Default trigger source
|
||||
assert mock_ddg2.trigger_source.get() == 1 # TRIGGERSOURCE.EXT_RISING_EDGE.value
|
||||
|
||||
|
||||
def test_ddg2_stage(mock_ddg2):
|
||||
"""Test the on_stage method of DDG2."""
|
||||
exp_time = 0.1
|
||||
frames_per_trigger = 10
|
||||
mock_ddg2.on_connected()
|
||||
|
||||
mock_ddg2.burst_mode.put(0)
|
||||
mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
|
||||
mock_ddg2.stage()
|
||||
|
||||
assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled
|
||||
assert np.isclose(mock_ddg2.ab.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"])
|
||||
assert mock_ddg2.burst_count.get() == frames_per_trigger
|
||||
assert np.isclose(mock_ddg2.burst_delay.get(), 0)
|
||||
assert np.isclose(mock_ddg2.burst_period.get(), exp_time)
|
||||
|
||||
assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value
|
||||
|
||||
assert mock_ddg2.staged == ophyd.Staged.yes
|
||||
|
||||
|
||||
def test_ddg2_trigger(mock_ddg2):
|
||||
"""Test the on_trigger method of DDG2."""
|
||||
mock_ddg2.trigger_shot.put(0)
|
||||
status = mock_ddg2.trigger()
|
||||
assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
|
||||
status.wait()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
|
||||
def test_ddg2_stop(mock_ddg2):
|
||||
"""Test the on_stop method of DDG2."""
|
||||
mock_ddg2.burst_mode.put(1) # Enable burst mode
|
||||
mock_ddg2.stop()
|
||||
assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled
|
||||
|
||||
Reference in New Issue
Block a user