From b441f5db6506bf343aaa0ca0c1eb102a6d1c7268 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 17 Jul 2024 22:04:16 +0200 Subject: [PATCH] fix: adapt scaninfo to pass on scanning parameters --- debye_bec/devices/mo1_bragg.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index 552b19c..35744d6 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -4,6 +4,7 @@ import enum import threading import time import traceback +import re from typing import Literal from bec_lib.logger import bec_logger @@ -249,6 +250,10 @@ class Mo1Bragg(Device, PositionerBase): self.readback.name = self.name self.service_cfg = None self.scaninfo = None + self.scan_time = None + self.scan_duration = None + self.start = None + self.end = None if device_manager: self.device_manager = device_manager @@ -530,12 +535,25 @@ class Mo1Bragg(Device, PositionerBase): self.on_stage() return super().stage() + def _get_scaninfo_parameters(self): + """Get the scaninfo parameters for the scan.""" + self.start, self.end = self.scaninfo.scan_msg.info["positions"] + scan_time_pattern = r"scan_time': (\d+\.?\d*)" + scan_duration_pattern = r"scan_duration': (\d+\.?\d*)" + pattern_match = re.search(scan_time_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.scan_time = float(pattern_match.group(1)) + pattern_match = re.search( + scan_duration_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0] + ) + self.scan_duration = float(pattern_match.group(1)) + def on_stage(self) -> None: """Actions to be executed when the device is staged.""" if not self.scaninfo.scan_type == "xas": return # Could become redudant if we use this in unstage self.scan_val_reset.put(1) + self._get_scaninfo_parameters() # Brief sleep to allow for PV to update (FYI, reliable set makes this redundant) time.sleep(0.2) if not self.wait_for_signals( @@ -549,15 +567,12 @@ class Mo1Bragg(Device, PositionerBase): f"Timeout while waiting for scan status to get ready. State: {self.scan_control.scan_status.get()}" ) # Add here the logic to setup different type of scans - low, high = self.scaninfo.positions - scan_time = self.scaninfo.scan_time - scan_duration = self.scaninfo.scan_duration self.setup_simple_xas_scan( - low=low, - high=high, - scan_time=scan_time, + low=self.start, + high=self.end, + scan_time=self.scan_time, mode=ScanControlMode.SIMPLE, - scan_duration=scan_duration, + scan_duration=self.scan_duration, ) def complete(self) -> None: