From e19ce733a27afd29d8014c3ebfb7d41d9a3449a7 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 5 Dec 2024 09:26:31 +0100 Subject: [PATCH] refactor: initial commit for updates on ddg from csaxs --- .../devices/epics/delay_generator_csaxs.py | 95 +++++---- .../test_delay_generator_csaxs.py | 194 ++++++++---------- 2 files changed, 137 insertions(+), 152 deletions(-) diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs.py index 6662e6c..65a890b 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs.py @@ -1,6 +1,5 @@ from bec_lib import bec_logger from ophyd import Component, DeviceStatus, Kind - from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase from ophyd_devices.sim.sim_signals import SetableSignal @@ -18,7 +17,7 @@ class DDGSetup(CustomPrepare): Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS """ - def on_wait_for_connection(self) ->None: + def on_wait_for_connection(self) -> None: """Init default parameter after the all signals are connected""" for ii, channel in enumerate(self.parent.all_channels): self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel]) @@ -37,69 +36,68 @@ class DDGSetup(CustomPrepare): self.parent.level.put(self.parent.thres_trig_level.get()) def on_stage(self) -> None: - # self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) - # scantype "jjf_test" + "Hook execute before the scan starts" if self.parent.scaninfo.scan_type == "step": exp_time = self.parent.scaninfo.exp_time delay = 0 self.parent.burst_disable() self.parent.set_trigger(TriggerSource.SINGLE_SHOT) - self.parent.set_channels(signal='width', value=exp_time) - self.parent.set_channels(signal='delay', value=delay) + self.parent.set_channels(signal="width", value=exp_time) + self.parent.set_channels(signal="delay", value=delay) return scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "jjf_test": - # exp_time = self.parent.scaninfo.exp_time - # readout = self.parent.scaninfo.readout_time - # num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - # total_exposure = exp_time+readout - exp_time = 480e-6#self.parent.scaninfo.exp_time - readout = 20e-6#self.parent.scaninfo.readout_time - total_exposure = exp_time+readout + if scan_name == "jjf_test": + # TODO implement the logic for JJF triggering + exp_time = 480e-6 # self.parent.scaninfo.exp_time + readout = 20e-6 # self.parent.scaninfo.readout_time + total_exposure = exp_time + readout num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure) - delay = 0 + num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) + delay = 0 delay_burst = self.parent.delay_burst.get() - - self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) - - self.parent.set_channels(signal='width', value=exp_time) - self.parent.set_channels(signal='delay', value=delay) - self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first") - logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}") - def on_trigger(self) -> None: + self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) + + self.parent.set_channels(signal="width", value=exp_time) + self.parent.set_channels(signal="delay", value=delay) + self.parent.burst_enable( + count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first" + ) + logger.info( + f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}" + ) + + def on_trigger(self) -> DeviceStatus: """Method to be executed upon trigger""" if self.parent.scaninfo.scan_type == "step": self.parent.trigger_shot.put(1) return scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") if scan_name == "jjf_test": - exp_time = 480e-6#self.parent.scaninfo.exp_time - readout = 20e-6#self.parent.scaninfo.readout_time - total_exposure = exp_time+readout + exp_time = 480e-6 # self.parent.scaninfo.exp_time + readout = 20e-6 # self.parent.scaninfo.readout_time + total_exposure = exp_time + readout num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure) - cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"] - #time.sleep(num_burst_cycle*total_exposure) - def check_ddg()->int: - self.parent.trigger_burst_readout.put(1) - return self.parent.burst_cycle_finished.get() - status = self.wait_with_status(signal_conditions=[(check_ddg, 1)], - timeout=num_burst_cycle*total_exposure+1, - check_stopped=True, - exception_on_timeout=DelayGeneratorcSAXSError(f"{self.parent.name} run into timeout in complete call.") - ) + num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) + + # Start trigger cycle + self.parent.trigger_burst_readout.put(1) + + # Create status object that will wait for the end of the burst cycle + status = self.wait_with_status( + signal_conditions=[(self.parent.burst_cycle_finished, 1)], + timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure + check_stopped=True, + exception_on_timeout=DelayGeneratorcSAXSError( + f"{self.parent.name} run into timeout in complete call." + ), + ) logger.info(f"Return status {self.parent.name}") return status - def on_complete(self) -> DeviceStatus: pass - def finished(self) -> None: - """Method checks if DDG finished acquisition""" - def on_pre_scan(self) -> None: """ Method called by pre_scan hook in parent class. @@ -110,7 +108,6 @@ class DDGSetup(CustomPrepare): self.parent.trigger_shot.put(1) - class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator): """ DG645 delay generator at cSAXS (multiple can be in use depending on the setup) @@ -141,8 +138,8 @@ class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator): custom_prepare_cls = DDGSetup # Custom signals passed on during the init procedure via BEC - #TODO review whether those should remain here like that - + # TODO review whether those should remain here like that + delay_burst = Component( bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config" ) @@ -220,10 +217,10 @@ class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator): def __init__( self, - name:str, - prefix:str="", - kind:Kind=None, - ddg_config:dict=None, + name: str, + prefix: str = "", + kind: Kind = None, + ddg_config: dict = None, parent=None, device_manager=None, **kwargs, diff --git a/tests/tests_devices/test_delay_generator_csaxs.py b/tests/tests_devices/test_delay_generator_csaxs.py index e12efa5..10005f1 100644 --- a/tests/tests_devices/test_delay_generator_csaxs.py +++ b/tests/tests_devices/test_delay_generator_csaxs.py @@ -2,7 +2,7 @@ from unittest import mock import pytest -from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import TriggerSource +from ophyd_devices.devices.delay_generator_645 import TriggerSource from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup @@ -126,20 +126,6 @@ def channel_pairs(request): return request.param -def test_check_scan_id(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): - """Test the check_scan_id method.""" - # Set first attributes of parent class - for k, v in scaninfo.items(): - setattr(mock_DDGSetup.parent.scaninfo, k, v) - for k, v in ddg_config_defaults.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - for k, v in ddg_config_scan.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - # Call the function you want to test - mock_DDGSetup.check_scan_id() - mock_DDGSetup.parent.scaninfo.load_scan_metadata.assert_called_once() - - def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): """Test the check_scan_id method.""" # Set first attributes of parent class @@ -155,27 +141,28 @@ def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_sc mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1) -@pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"]) -def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source): - """Test the on_trigger method.""" - # Set first attributes of parent class - for k, v in scaninfo.items(): - setattr(mock_DDGSetup.parent.scaninfo, k, v) - for k, v in ddg_config_defaults.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - for k, v in ddg_config_scan.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - # Call the function you want to test - mock_DDGSetup.parent.source.name = "source" - mock_DDGSetup.parent.source.read.return_value = { - mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)} - } - mock_DDGSetup.on_trigger() - if source == "SINGLE_SHOT": - mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1) +# TODO put back once the logic is implemented +# @pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"]) +# def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source): +# """Test the on_trigger method.""" +# # Set first attributes of parent class +# for k, v in scaninfo.items(): +# setattr(mock_DDGSetup.parent.scaninfo, k, v) +# for k, v in ddg_config_defaults.items(): +# getattr(mock_DDGSetup.parent, k).get.return_value = v +# for k, v in ddg_config_scan.items(): +# getattr(mock_DDGSetup.parent, k).get.return_value = v +# # Call the function you want to test +# mock_DDGSetup.parent.source.name = "source" +# mock_DDGSetup.parent.source.read.return_value = { +# mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)} +# } +# mock_DDGSetup.on_trigger() +# if source == "SINGLE_SHOT": +# mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1) -def test_initialize_default_parameter( +def test_on_wait_for_connection( mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs ): """Test the initialize_default_parameter method.""" @@ -212,77 +199,78 @@ def test_initialize_default_parameter( ) ] ) - mock_DDGSetup.initialize_default_parameter() + mock_DDGSetup.on_wait_for_connection() mock_DDGSetup.parent.set_channels.assert_has_calls(calls) -def test_prepare_ddg(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs): - """Test the prepare_ddg method.""" - # Set first attributes of parent class - for k, v in scaninfo.items(): - setattr(mock_DDGSetup.parent.scaninfo, k, v) - for k, v in ddg_config_defaults.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - for k, v in ddg_config_scan.items(): - getattr(mock_DDGSetup.parent, k).get.return_value = v - # Call the function you want to test - mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] - mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] +# TODO put back once the logic is implemented +# def test_on_stage(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs): +# """Test the prepare_ddg method.""" +# # Set first attributes of parent class +# for k, v in scaninfo.items(): +# setattr(mock_DDGSetup.parent.scaninfo, k, v) +# for k, v in ddg_config_defaults.items(): +# getattr(mock_DDGSetup.parent, k).get.return_value = v +# for k, v in ddg_config_scan.items(): +# getattr(mock_DDGSetup.parent, k).get.return_value = v +# # Call the function you want to test +# mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] +# mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] - mock_DDGSetup.prepare_ddg() - mock_DDGSetup.parent.set_trigger.assert_called_once_with( - getattr(TriggerSource, ddg_config_scan["set_trigger_source"]) - ) - if scaninfo["scan_type"] == "step": - if ddg_config_scan["set_high_on_exposure"]: - num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] - exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * ( - scaninfo["exp_time"] + scaninfo["readout_time"] - ) - total_exposure = exp_time - delay_burst = ddg_config_defaults["delay_burst"] - else: - exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] - total_exposure = exp_time + scaninfo["readout_time"] - delay_burst = ddg_config_defaults["delay_burst"] - num_burst_cycle = ( - scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"] - ) - elif scaninfo["scan_type"] == "fly": - if ddg_config_scan["set_high_on_exposure"]: - num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] - exp_time = ( - ddg_config_defaults["delta_width"] - + scaninfo["num_points"] * scaninfo["exp_time"] - + (scaninfo["num_points"] - 1) * scaninfo["readout_time"] - ) - total_exposure = exp_time - delay_burst = ddg_config_defaults["delay_burst"] - else: - exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] - total_exposure = exp_time + scaninfo["readout_time"] - delay_burst = ddg_config_defaults["delay_burst"] - num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"] +# mock_DDGSetup.prepare_ddg() +# mock_DDGSetup.parent.set_trigger.assert_called_once_with( +# getattr(TriggerSource, ddg_config_scan["set_trigger_source"]) +# ) +# if scaninfo["scan_type"] == "step": +# if ddg_config_scan["set_high_on_exposure"]: +# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] +# exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * ( +# scaninfo["exp_time"] + scaninfo["readout_time"] +# ) +# total_exposure = exp_time +# delay_burst = ddg_config_defaults["delay_burst"] +# else: +# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] +# total_exposure = exp_time + scaninfo["readout_time"] +# delay_burst = ddg_config_defaults["delay_burst"] +# num_burst_cycle = ( +# scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"] +# ) +# elif scaninfo["scan_type"] == "fly": +# if ddg_config_scan["set_high_on_exposure"]: +# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] +# exp_time = ( +# ddg_config_defaults["delta_width"] +# + scaninfo["num_points"] * scaninfo["exp_time"] +# + (scaninfo["num_points"] - 1) * scaninfo["readout_time"] +# ) +# total_exposure = exp_time +# delay_burst = ddg_config_defaults["delay_burst"] +# else: +# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"] +# total_exposure = exp_time + scaninfo["readout_time"] +# delay_burst = ddg_config_defaults["delay_burst"] +# num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"] - # mock_DDGSetup.parent.burst_enable.assert_called_once_with( - # mock.call(num_burst_cycle, delay_burst, total_exposure, config="first") - # ) - mock_DDGSetup.parent.burst_enable.assert_called_once_with( - num_burst_cycle, delay_burst, total_exposure, config="first" - ) - if not ddg_config_scan["trigger_width"]: - call = mock.call("width", exp_time) - assert call in mock_DDGSetup.parent.set_channels.mock_calls - else: - call = mock.call("width", ddg_config_scan["trigger_width"]) - assert call in mock_DDGSetup.parent.set_channels.mock_calls - if ddg_config_scan["set_high_on_exposure"]: - calls = [ - mock.call("width", value, channels=[channel]) - for value, channel in zip( - ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"] - ) - if value != 0 - ] - if calls: - assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls) +# # mock_DDGSetup.parent.burst_enable.assert_called_once_with( +# # mock.call(num_burst_cycle, delay_burst, total_exposure, config="first") +# # ) +# mock_DDGSetup.parent.burst_enable.assert_called_once_with( +# num_burst_cycle, delay_burst, total_exposure, config="first" +# ) +# if not ddg_config_scan["trigger_width"]: +# call = mock.call("width", exp_time) +# assert call in mock_DDGSetup.parent.set_channels.mock_calls +# else: +# call = mock.call("width", ddg_config_scan["trigger_width"]) +# assert call in mock_DDGSetup.parent.set_channels.mock_calls +# if ddg_config_scan["set_high_on_exposure"]: +# calls = [ +# mock.call("width", value, channels=[channel]) +# for value, channel in zip( +# ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"] +# ) +# if value != 0 +# ] +# if calls: +# assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls)