diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg.py b/debye_bec/devices/mo1_bragg/mo1_bragg.py index 69df2d2..30419b9 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg.py @@ -70,6 +70,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs) self.scan_parameters: ScanServerScanInfo = None self.timeout_for_pvwait = 7.5 + self.valid_scan_names = [ + "xas_simple_scan", + "xas_simple_scan_with_xrd", + "xas_advanced_scan", + "xas_advanced_scan_with_xrd", + "nidaq_continuous_scan", + ] ######################################## # Beamline Specific Implementations # @@ -104,158 +111,172 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): status.wait(timeout=self.timeout_for_pvwait) scan_name = self.scan_parameters.scan_name - start, stop = self.scan_parameters.positions or (None, None) - scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None) - scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None) - if scan_name == "xas_simple_scan": - if any(param is None for param in [start, stop, scan_time, scan_duration]): - raise Mo1BraggError( - f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + if self._check_if_scan_name_is_valid(self.scan_parameters): + start, stop = self.scan_parameters.positions or (None, None) + scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None) + scan_duration = self.scan_parameters.additional_scan_parameters.get( + "scan_duration", None + ) + if scan_name == "xas_simple_scan": + if any(param is None for param in [start, stop, scan_time, scan_duration]): + raise Mo1BraggError( + f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) + self.set_xas_settings(low=start, high=stop, scan_time=scan_time) + self.set_trig_settings( + enable_low=False, + enable_high=False, + break_time_low=0, + break_time_high=0, + cycle_low=0, + cycle_high=0, + exp_time=0, + n_of_trigger=0, ) - self.set_xas_settings(low=start, high=stop, scan_time=scan_time) - self.set_trig_settings( - enable_low=False, - enable_high=False, - break_time_low=0, - break_time_high=0, - cycle_low=0, - cycle_high=0, - exp_time=0, - n_of_trigger=0, - ) - self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration) - elif scan_name == "xas_simple_scan_with_xrd": - break_enable_low = self.scan_parameters.additional_scan_parameters.get( - "break_enable_low", None - ) - break_enable_high = self.scan_parameters.additional_scan_parameters.get( - "break_enable_high", None - ) - break_time_low = self.scan_parameters.additional_scan_parameters.get( - "break_time_low", None - ) - break_time_high = self.scan_parameters.additional_scan_parameters.get( - "break_time_high", None - ) - cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None) - cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None) - exp_time = self.scan_parameters.exp_time - n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None) - if any( - param is None - for param in [ - start, - stop, - scan_time, - scan_duration, - break_enable_low, - break_enable_high, - break_time_low, - break_time_high, - cycle_low, - cycle_high, - exp_time, - n_of_trigger, - ] - ): - raise Mo1BraggError( - f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + self.set_scan_control_settings( + mode=ScanControlMode.SIMPLE, scan_duration=scan_duration ) - self.set_xas_settings(low=start, high=stop, scan_time=scan_time) - self.set_trig_settings( - enable_low=break_enable_low, - enable_high=break_enable_high, - break_time_low=break_time_low, - break_time_high=break_time_high, - cycle_low=cycle_low, - cycle_high=cycle_high, - exp_time=exp_time, - n_of_trigger=n_of_trigger, - ) - self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration) - elif scan_name == "xas_advanced_scan": - p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None) - e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None) - if any( - param is None for param in [start, stop, scan_time, scan_duration, p_kink, e_kink] - ): - raise Mo1BraggError( - f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + elif scan_name == "xas_simple_scan_with_xrd": + break_enable_low = self.scan_parameters.additional_scan_parameters.get( + "break_enable_low", None ) - self.set_advanced_xas_settings( - low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink - ) - self.set_trig_settings( - enable_low=False, - enable_high=False, - break_time_low=0, - break_time_high=0, - cycle_low=0, - cycle_high=0, - exp_time=0, - n_of_trigger=0, - ) - self.set_scan_control_settings( - mode=ScanControlMode.ADVANCED, scan_duration=scan_duration - ) - elif scan_name == "xas_advanced_scan_with_xrd": - p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None) - e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None) - break_enable_low = self.scan_parameters.additional_scan_parameters.get( - "break_enable_low", None - ) - break_enable_high = self.scan_parameters.additional_scan_parameters.get( - "break_enable_high", None - ) - break_time_low = self.scan_parameters.additional_scan_parameters.get( - "break_time_low", None - ) - break_time_high = self.scan_parameters.additional_scan_parameters.get( - "break_time_high", None - ) - cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None) - cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None) - exp_time = self.scan_parameters.exp_time - n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None) - if any( - param is None - for param in [ - start, - stop, - scan_time, - scan_duration, - p_kink, - e_kink, - break_enable_low, - break_enable_high, - break_time_low, - break_time_high, - cycle_low, - cycle_high, - exp_time, - n_of_trigger, - ] - ): - raise Mo1BraggError( - f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + break_enable_high = self.scan_parameters.additional_scan_parameters.get( + "break_enable_high", None ) + break_time_low = self.scan_parameters.additional_scan_parameters.get( + "break_time_low", None + ) + break_time_high = self.scan_parameters.additional_scan_parameters.get( + "break_time_high", None + ) + cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None) + cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None) + exp_time = self.scan_parameters.exp_time + n_of_trigger = self.scan_parameters.additional_scan_parameters.get( + "n_of_trigger", None + ) + if any( + param is None + for param in [ + start, + stop, + scan_time, + scan_duration, + break_enable_low, + break_enable_high, + break_time_low, + break_time_high, + cycle_low, + cycle_high, + exp_time, + n_of_trigger, + ] + ): + raise Mo1BraggError( + f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) + self.set_xas_settings(low=start, high=stop, scan_time=scan_time) + self.set_trig_settings( + enable_low=break_enable_low, + enable_high=break_enable_high, + break_time_low=break_time_low, + break_time_high=break_time_high, + cycle_low=cycle_low, + cycle_high=cycle_high, + exp_time=exp_time, + n_of_trigger=n_of_trigger, + ) + self.set_scan_control_settings( + mode=ScanControlMode.SIMPLE, scan_duration=scan_duration + ) + elif scan_name == "xas_advanced_scan": + p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None) + e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None) + if any( + param is None + for param in [start, stop, scan_time, scan_duration, p_kink, e_kink] + ): + raise Mo1BraggError( + f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) + self.set_advanced_xas_settings( + low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink + ) + self.set_trig_settings( + enable_low=False, + enable_high=False, + break_time_low=0, + break_time_high=0, + cycle_low=0, + cycle_high=0, + exp_time=0, + n_of_trigger=0, + ) + self.set_scan_control_settings( + mode=ScanControlMode.ADVANCED, scan_duration=scan_duration + ) + elif scan_name == "xas_advanced_scan_with_xrd": + p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None) + e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None) + break_enable_low = self.scan_parameters.additional_scan_parameters.get( + "break_enable_low", None + ) + break_enable_high = self.scan_parameters.additional_scan_parameters.get( + "break_enable_high", None + ) + break_time_low = self.scan_parameters.additional_scan_parameters.get( + "break_time_low", None + ) + break_time_high = self.scan_parameters.additional_scan_parameters.get( + "break_time_high", None + ) + cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None) + cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None) + exp_time = self.scan_parameters.exp_time + n_of_trigger = self.scan_parameters.additional_scan_parameters.get( + "n_of_trigger", None + ) + if any( + param is None + for param in [ + start, + stop, + scan_time, + scan_duration, + p_kink, + e_kink, + break_enable_low, + break_enable_high, + break_time_low, + break_time_high, + cycle_low, + cycle_high, + exp_time, + n_of_trigger, + ] + ): + raise Mo1BraggError( + f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}" + ) - self.set_advanced_xas_settings( - low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink - ) - self.set_trig_settings( - enable_low=break_enable_low, - enable_high=break_enable_high, - break_time_low=break_time_low, - break_time_high=break_time_high, - cycle_low=cycle_low, - cycle_high=cycle_high, - exp_time=exp_time, - n_of_trigger=n_of_trigger, - ) - self.set_scan_control_settings( - mode=ScanControlMode.ADVANCED, scan_duration=scan_duration - ) + self.set_advanced_xas_settings( + low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink + ) + self.set_trig_settings( + enable_low=break_enable_low, + enable_high=break_enable_high, + break_time_low=break_time_low, + break_time_high=break_time_high, + cycle_low=cycle_low, + cycle_high=cycle_high, + exp_time=exp_time, + n_of_trigger=n_of_trigger, + ) + self.set_scan_control_settings( + mode=ScanControlMode.ADVANCED, scan_duration=scan_duration + ) + else: + return # Should never happen. else: return # Setting scan duration seems to lag behind slightly in the backend, include small sleep @@ -335,6 +356,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): self.stopped = True # Needs to be set to stop motion ######### Utility Methods ######### + + def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool: + """Check if the scan is within the list of scans for which the backend is working""" + if scan_parameters.scan_name in self.valid_scan_names: + return True + return False + def _progress_update(self, value, **kwargs) -> None: """Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC.