feat: first draft for XRD scan

This commit is contained in:
gac-x01da (Resp. Clark Adam Hugh)
2024-07-24 16:14:30 +02:00
committed by appel_c
parent bc2b884413
commit 7d3bd609d1
3 changed files with 180 additions and 43 deletions

View File

@@ -269,6 +269,14 @@ class Mo1Bragg(Device, PositionerBase):
self.scaninfo = None
self.scan_time = None
self.scan_duration = None
self.xrd_enable_low = None
self.xrd_enable_high = None
self.num_trigger_low = None
self.num_trigger_high = None
self.exp_time_low = None
self.exp_time_high = None
self.cycle_low = None
self.cycle_high = None
self.start = None
self.end = None
@@ -296,7 +304,7 @@ class Mo1Bragg(Device, PositionerBase):
Args:
value (int) : current progress value
"""
max_value = 100
max_value=100
logger.info(f"Progress at {value}")
self._run_subs(
sub_type=self.SUB_PROGRESS,
@@ -508,11 +516,36 @@ class Mo1Bragg(Device, PositionerBase):
self.scan_settings.s_scan_angle_hi.put(high)
self.scan_settings.s_scan_scantime.put(scan_time)
def set_xrd_settings(self, enable: bool) -> None:
"""Set XRD settings for the upcoming scan."""
self.scan_settings.xrd_enable_hi_enum.put(int(enable))
self.scan_settings.xrd_enable_lo_enum.put(int(enable))
# TODO add here the remaining settings that are possible for XRD
def set_xrd_settings(self,
enable_low: bool = False,
enable_high: bool = False,
num_trigger_low:int=0,
num_trigger_high:int=0,
exp_time_low:int=0,
exp_time_high:int=0,
cycle_low:int=0,
cycle_high:int=0,
) -> None:
"""Set XRD settings for the upcoming scan.
Args:
enable_low (bool): Enable XRD for low energy/angle
enable_high (bool): Enable XRD for high energy/angle
num_trigger_low (int): Number of triggers for low energy/angle
num_trigger_high (int): Number of triggers for high energy/angle
exp_time_low (int): Exposure time for low energy/angle
exp_time_high (int): Exposure time for high energy/angle
cycle_low (int): Cycle for low energy/angle
cycle_high (int): Cycle for high energy/angle
"""
self.scan_settings.xrd_enable_hi_enum.put(int(enable_high))
self.scan_settings.xrd_enable_lo_enum.put(int(enable_low))
self.scan_settings.xrd_n_trigger_hi.put(num_trigger_high)
self.scan_settings.xrd_n_trigger_lo.put(num_trigger_low)
self.scan_settings.xrd_time_hi.put(exp_time_high)
self.scan_settings.xrd_time_lo.put(exp_time_low)
self.scan_settings.xrd_every_n_hi.put(cycle_high)
self.scan_settings.xrd_every_n_lo.put(cycle_low)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
@@ -525,13 +558,11 @@ class Mo1Bragg(Device, PositionerBase):
self.scan_control.scan_mode_enum.put(val)
self.scan_control.scan_duration.put(scan_duration)
def setup_simple_xas_scan(
def setup_xas_scan(
self,
low: float,
high: float,
scan_time: float,
mode: Literal["simple", "advanced"],
scan_duration: float = 0,
**kwargs,
):
"""Setup a simple XAS scan for the Bragg positioner.
@@ -544,9 +575,18 @@ class Mo1Bragg(Device, PositionerBase):
scan_duration (float): Duration of the scan, if <0.1s the scan will be started in infinite motion mode
"""
# TODO check if maybe we want an extra argument for infinite or finite motion
self.set_xas_settings(low=low, high=high, scan_time=scan_time)
self.set_xrd_settings(False)
self.set_scan_control_settings(mode=mode, scan_duration=scan_duration)
self.set_xas_settings(low=low, high=high, scan_time=self.scan_time)
self.set_xrd_settings(
enable_low=self.xrd_enable_low,
enable_high=self.xrd_enable_high,
num_trigger_low=self.num_trigger_low,
num_trigger_high=self.num_trigger_high,
exp_time_low=self.exp_time_low,
exp_time_high=self.exp_time_high,
cycle_low=self.cycle_low,
cycle_high=self.cycle_high,
)
self.set_scan_control_settings(mode=mode, scan_duration=self.scan_duration)
self.scan_control.scan_load.put(1)
def kickoff(self):
@@ -587,13 +627,42 @@ class Mo1Bragg(Device, PositionerBase):
def _get_scaninfo_parameters(self):
"""Get the scaninfo parameters for the scan."""
self.start, self.end = self.scaninfo.scan_msg.info["positions"]
# scan_time and scan_duration have to be extracted from scan_msg string for the moment, will be changed in future
scan_time_pattern = r"scan_time': (\d+\.?\d*)"
scan_dur_pattern = r"scan_duration': (\d+\.?\d*)"
pattern_match = re.search(scan_time_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0])
self.scan_time = float(pattern_match.group(1))
pattern_match = re.search(scan_dur_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0])
self.scan_duration = float(pattern_match.group(1))
logger.info(self.scaninfo.scan_msg.info["scan_msgs"][0])
pattern = re.compile(r"'scan_time':\s*([\d\.]+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.scan_time = float(pattern_match.group(1)) if pattern_match else 0.0
pattern = re.compile(r"'scan_duration':\s*([\d\.]+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.scan_duration = float(pattern_match.group(1)) if pattern_match else 0.0
pattern = re.compile(r"'exp_time_low':\s*([\d\.]+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.exp_time_low = float(pattern_match.group(1)) if pattern_match else 0.0
pattern = re.compile(r"'exp_time_high':\s*([\d\.]+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.exp_time_high = float(pattern_match.group(1)) if pattern_match else 0.0
pattern = re.compile(r"'xrd_enable_low':\s*(True|False)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.xrd_enable_low = pattern_match.group(1) =='True' if pattern_match else False
pattern = re.compile(r"'xrd_enable_high':\s*(True|False)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.xrd_enable_high = pattern_match.group(1) =='True' if pattern_match else False
pattern = re.compile(r"'num_trigger_low':\s*(\d+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.num_trigger_low = int(pattern_match.group(1)) if pattern_match else 0
pattern = re.compile(r"'num_trigger_high':\s*(\d+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.num_trigger_high = int(pattern_match.group(1)) if pattern_match else 0
pattern = re.compile(r"'cycle_low':\s*(\d+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.cycle_low = int(pattern_match.group(1)) if pattern_match else 0
pattern = re.compile(r"'cycle_high':\s*(\d+)")
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
self.cycle_high = int(pattern_match.group(1)) if pattern_match else 0
def on_stage(self) -> None:
"""Actions to be executed when the device is staged."""
@@ -613,14 +682,12 @@ class Mo1Bragg(Device, PositionerBase):
)
# Add here the logic to setup different type of scans
scan_name = self.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "energy_oscillation_scan":
if scan_name == "xas_simple_scan" or scan_name == "xas_simple_scan_with_xrd":
self._get_scaninfo_parameters()
self.setup_simple_xas_scan(
self.setup_xas_scan(
low=self.start,
high=self.end,
scan_time=self.scan_time,
mode=ScanControlMode.SIMPLE,
scan_duration=self.scan_duration,
mode=ScanControlMode.SIMPLE
)
else:
raise Mo1BraggError(f"Scan mode {scan_name} not implemented for device {self.name}")
@@ -672,6 +739,14 @@ class Mo1Bragg(Device, PositionerBase):
"""Actions to be executed when the device is unstaged."""
# Reset scan parameter validation
self.scan_control.scan_val_reset.put(1)
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}"
)
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""

View File

@@ -1 +1 @@
from .mono_bragg_scans import MonoBraggOscillationScan
from .mono_bragg_scans import XASSimpleScan, XASSimpleScanWithXRD

View File

@@ -10,18 +10,18 @@ from bec_server.scan_server.scans import AsyncFlyScanBase
from debye_bec.devices.mo1_bragg import MoveType
# TODO How to report the scan progress?
class MonoBraggOscillationScan(AsyncFlyScanBase):
"""
This class is used to start an oscillating scan motion on the mono bragg motor.
"""
class XASSimpleScan(AsyncFlyScanBase):
scan_name = "energy_oscillation_scan"
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
}
def __init__(
self,
@@ -29,18 +29,27 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = None,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
""" The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
"""
super().__init__(**kwargs)
# TODO How to we deal with scans written for a specific device/motor?
self.motor = motor if motor is not None else self.device_manager.devices["mo1_bragg"].name
self.motor = motor
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.device_move_request_id = str(uuid.uuid4())
self.primary_readout_cycle = 1 # Sleep time in seconds between readout cycles
self.primary_readout_cycle = 1
def prepare_positions(self):
"""Prepare the positions for the scan.
@@ -92,10 +101,63 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
time.sleep(self.primary_readout_cycle)
self.point_id += 1
def finalize(self):
"""Set the number of points for the scan, base on the point_id which incrementally increases in scan_core."""
# Call complete on all devices except the motor/Mo1Bragg
yield from self.stubs.complete(
device=[dev for dev in self.device_manager.devices.keys() if dev != self.motor]
)
self.num_pos = self.point_id + 1
self.num_pos = self.point_id +1
class XASSimpleScanWithXRD(XASSimpleScan):
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
"High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
xrd_enable_low : bool,
num_trigger_low :int,
exp_time_low : float,
cycle_low : int,
xrd_enable_high : bool,
num_trigger_high :int,
exp_time_high : float,
cycle_high : float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
""" The xas_simple_scan_with_xrd is used to start a simple oscillating scan on the mono bragg motor with XRD triggering.
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
xrd_enable_low and xrd_enable_high define if XRD triggering is enabled for the low and high energy range.
num_trigger_low and num_trigger_high define the number of triggers for the low and high energy range.
exp_time_low and exp_time_high define the exposure time for the low and high energy range.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
xrd_enable_low (bool): Enable XRD triggering for the low energy range.
num_trigger_low (int): Number of triggers for the low energy range.
exp_time_low (float): Exposure time for the low energy range.
cycle_low (int): Cycle for the low energy range.
xrd_enable_high (bool): Enable XRD triggering for the high energy range.
num_trigger_high (int): Number of triggers for the high energy range.
exp_time_high (float): Exposure time for the high energy range.
cycle_high (int): Cycle for the high energy range.
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
"""
super().__init__(start=start, stop=stop, scan_time=scan_time, scan_duration=scan_duration, motor=motor, **kwargs)
self.xrd_enable_low = xrd_enable_low
self.num_trigger_low = num_trigger_low
self.exp_time_low = exp_time_low
self.cycle_low = cycle_low
self.xrd_enable_high = xrd_enable_high
self.num_trigger_high = num_trigger_high
self.exp_time_high = exp_time_high
self.cycle_high = cycle_high