refactor: initial commit for updates on ddg from csaxs

This commit is contained in:
2024-12-05 09:26:31 +01:00
parent ccaf996512
commit e19ce733a2
2 changed files with 137 additions and 152 deletions
@@ -1,6 +1,5 @@
from bec_lib import bec_logger
from ophyd import Component, DeviceStatus, Kind
from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource
from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
@@ -18,7 +17,7 @@ class DDGSetup(CustomPrepare):
Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS
"""
def on_wait_for_connection(self) ->None:
def on_wait_for_connection(self) -> None:
"""Init default parameter after the all signals are connected"""
for ii, channel in enumerate(self.parent.all_channels):
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
@@ -37,69 +36,68 @@ class DDGSetup(CustomPrepare):
self.parent.level.put(self.parent.thres_trig_level.get())
def on_stage(self) -> None:
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "jjf_test"
"Hook execute before the scan starts"
if self.parent.scaninfo.scan_type == "step":
exp_time = self.parent.scaninfo.exp_time
delay = 0
self.parent.burst_disable()
self.parent.set_trigger(TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
self.parent.set_channels(signal="width", value=exp_time)
self.parent.set_channels(signal="delay", value=delay)
return
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
# exp_time = self.parent.scaninfo.exp_time
# readout = self.parent.scaninfo.readout_time
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# total_exposure = exp_time+readout
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
if scan_name == "jjf_test":
# TODO implement the logic for JJF triggering
exp_time = 480e-6 # self.parent.scaninfo.exp_time
readout = 20e-6 # self.parent.scaninfo.readout_time
total_exposure = exp_time + readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
delay = 0
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
delay = 0
delay_burst = self.parent.delay_burst.get()
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
def on_trigger(self) -> None:
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal="width", value=exp_time)
self.parent.set_channels(signal="delay", value=delay)
self.parent.burst_enable(
count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first"
)
logger.info(
f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}"
)
def on_trigger(self) -> DeviceStatus:
"""Method to be executed upon trigger"""
if self.parent.scaninfo.scan_type == "step":
self.parent.trigger_shot.put(1)
return
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
exp_time = 480e-6 # self.parent.scaninfo.exp_time
readout = 20e-6 # self.parent.scaninfo.readout_time
total_exposure = exp_time + readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
#time.sleep(num_burst_cycle*total_exposure)
def check_ddg()->int:
self.parent.trigger_burst_readout.put(1)
return self.parent.burst_cycle_finished.get()
status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
timeout=num_burst_cycle*total_exposure+1,
check_stopped=True,
exception_on_timeout=DelayGeneratorcSAXSError(f"{self.parent.name} run into timeout in complete call.")
)
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
# Start trigger cycle
self.parent.trigger_burst_readout.put(1)
# Create status object that will wait for the end of the burst cycle
status = self.wait_with_status(
signal_conditions=[(self.parent.burst_cycle_finished, 1)],
timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure
check_stopped=True,
exception_on_timeout=DelayGeneratorcSAXSError(
f"{self.parent.name} run into timeout in complete call."
),
)
logger.info(f"Return status {self.parent.name}")
return status
def on_complete(self) -> DeviceStatus:
pass
def finished(self) -> None:
"""Method checks if DDG finished acquisition"""
def on_pre_scan(self) -> None:
"""
Method called by pre_scan hook in parent class.
@@ -110,7 +108,6 @@ class DDGSetup(CustomPrepare):
self.parent.trigger_shot.put(1)
class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator):
"""
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
@@ -141,8 +138,8 @@ class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator):
custom_prepare_cls = DDGSetup
# Custom signals passed on during the init procedure via BEC
#TODO review whether those should remain here like that
# TODO review whether those should remain here like that
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
@@ -220,10 +217,10 @@ class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator):
def __init__(
self,
name:str,
prefix:str="",
kind:Kind=None,
ddg_config:dict=None,
name: str,
prefix: str = "",
kind: Kind = None,
ddg_config: dict = None,
parent=None,
device_manager=None,
**kwargs,
+91 -103
View File
@@ -2,7 +2,7 @@
from unittest import mock
import pytest
from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import TriggerSource
from ophyd_devices.devices.delay_generator_645 import TriggerSource
from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup
@@ -126,20 +126,6 @@ def channel_pairs(request):
return request.param
def test_check_scan_id(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scan_id method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.check_scan_id()
mock_DDGSetup.parent.scaninfo.load_scan_metadata.assert_called_once()
def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scan_id method."""
# Set first attributes of parent class
@@ -155,27 +141,28 @@ def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_sc
mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
@pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"])
def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source):
"""Test the on_trigger method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.parent.source.name = "source"
mock_DDGSetup.parent.source.read.return_value = {
mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)}
}
mock_DDGSetup.on_trigger()
if source == "SINGLE_SHOT":
mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
# TODO put back once the logic is implemented
# @pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"])
# def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source):
# """Test the on_trigger method."""
# # Set first attributes of parent class
# for k, v in scaninfo.items():
# setattr(mock_DDGSetup.parent.scaninfo, k, v)
# for k, v in ddg_config_defaults.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# for k, v in ddg_config_scan.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# # Call the function you want to test
# mock_DDGSetup.parent.source.name = "source"
# mock_DDGSetup.parent.source.read.return_value = {
# mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)}
# }
# mock_DDGSetup.on_trigger()
# if source == "SINGLE_SHOT":
# mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
def test_initialize_default_parameter(
def test_on_wait_for_connection(
mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs
):
"""Test the initialize_default_parameter method."""
@@ -212,77 +199,78 @@ def test_initialize_default_parameter(
)
]
)
mock_DDGSetup.initialize_default_parameter()
mock_DDGSetup.on_wait_for_connection()
mock_DDGSetup.parent.set_channels.assert_has_calls(calls)
def test_prepare_ddg(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs):
"""Test the prepare_ddg method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
# TODO put back once the logic is implemented
# def test_on_stage(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs):
# """Test the prepare_ddg method."""
# # Set first attributes of parent class
# for k, v in scaninfo.items():
# setattr(mock_DDGSetup.parent.scaninfo, k, v)
# for k, v in ddg_config_defaults.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# for k, v in ddg_config_scan.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# # Call the function you want to test
# mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
# mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
mock_DDGSetup.prepare_ddg()
mock_DDGSetup.parent.set_trigger.assert_called_once_with(
getattr(TriggerSource, ddg_config_scan["set_trigger_source"])
)
if scaninfo["scan_type"] == "step":
if ddg_config_scan["set_high_on_exposure"]:
num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * (
scaninfo["exp_time"] + scaninfo["readout_time"]
)
total_exposure = exp_time
delay_burst = ddg_config_defaults["delay_burst"]
else:
exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
total_exposure = exp_time + scaninfo["readout_time"]
delay_burst = ddg_config_defaults["delay_burst"]
num_burst_cycle = (
scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"]
)
elif scaninfo["scan_type"] == "fly":
if ddg_config_scan["set_high_on_exposure"]:
num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
exp_time = (
ddg_config_defaults["delta_width"]
+ scaninfo["num_points"] * scaninfo["exp_time"]
+ (scaninfo["num_points"] - 1) * scaninfo["readout_time"]
)
total_exposure = exp_time
delay_burst = ddg_config_defaults["delay_burst"]
else:
exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
total_exposure = exp_time + scaninfo["readout_time"]
delay_burst = ddg_config_defaults["delay_burst"]
num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"]
# mock_DDGSetup.prepare_ddg()
# mock_DDGSetup.parent.set_trigger.assert_called_once_with(
# getattr(TriggerSource, ddg_config_scan["set_trigger_source"])
# )
# if scaninfo["scan_type"] == "step":
# if ddg_config_scan["set_high_on_exposure"]:
# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * (
# scaninfo["exp_time"] + scaninfo["readout_time"]
# )
# total_exposure = exp_time
# delay_burst = ddg_config_defaults["delay_burst"]
# else:
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
# total_exposure = exp_time + scaninfo["readout_time"]
# delay_burst = ddg_config_defaults["delay_burst"]
# num_burst_cycle = (
# scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"]
# )
# elif scaninfo["scan_type"] == "fly":
# if ddg_config_scan["set_high_on_exposure"]:
# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
# exp_time = (
# ddg_config_defaults["delta_width"]
# + scaninfo["num_points"] * scaninfo["exp_time"]
# + (scaninfo["num_points"] - 1) * scaninfo["readout_time"]
# )
# total_exposure = exp_time
# delay_burst = ddg_config_defaults["delay_burst"]
# else:
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
# total_exposure = exp_time + scaninfo["readout_time"]
# delay_burst = ddg_config_defaults["delay_burst"]
# num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"]
# mock_DDGSetup.parent.burst_enable.assert_called_once_with(
# mock.call(num_burst_cycle, delay_burst, total_exposure, config="first")
# )
mock_DDGSetup.parent.burst_enable.assert_called_once_with(
num_burst_cycle, delay_burst, total_exposure, config="first"
)
if not ddg_config_scan["trigger_width"]:
call = mock.call("width", exp_time)
assert call in mock_DDGSetup.parent.set_channels.mock_calls
else:
call = mock.call("width", ddg_config_scan["trigger_width"])
assert call in mock_DDGSetup.parent.set_channels.mock_calls
if ddg_config_scan["set_high_on_exposure"]:
calls = [
mock.call("width", value, channels=[channel])
for value, channel in zip(
ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"]
)
if value != 0
]
if calls:
assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls)
# # mock_DDGSetup.parent.burst_enable.assert_called_once_with(
# # mock.call(num_burst_cycle, delay_burst, total_exposure, config="first")
# # )
# mock_DDGSetup.parent.burst_enable.assert_called_once_with(
# num_burst_cycle, delay_burst, total_exposure, config="first"
# )
# if not ddg_config_scan["trigger_width"]:
# call = mock.call("width", exp_time)
# assert call in mock_DDGSetup.parent.set_channels.mock_calls
# else:
# call = mock.call("width", ddg_config_scan["trigger_width"])
# assert call in mock_DDGSetup.parent.set_channels.mock_calls
# if ddg_config_scan["set_high_on_exposure"]:
# calls = [
# mock.call("width", value, channels=[channel])
# for value, channel in zip(
# ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"]
# )
# if value != 0
# ]
# if calls:
# assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls)