refactor: mono scans and device refactoring, added dataclass for scan args
This commit is contained in:
@@ -13,6 +13,7 @@ import re
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
@@ -35,7 +36,7 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
class ScanControlScanStatus(int, enum.Enum):
|
||||
"""Enum class for the scan status of the Bragg positioner"""
|
||||
"""Enum class for the scan status of Bragg positioner"""
|
||||
|
||||
PARAMETER_WRONG = 0
|
||||
VALIDATION_PENDING = 1
|
||||
@@ -44,7 +45,7 @@ class ScanControlScanStatus(int, enum.Enum):
|
||||
|
||||
|
||||
class ScanControlLoadMessage(int, enum.Enum):
|
||||
"""Validation of loading the scan parameters"""
|
||||
"""Enum for validating messages for load message of Bragg positioner"""
|
||||
|
||||
PENDING = 0
|
||||
STARTED = 1
|
||||
@@ -102,7 +103,7 @@ class Mo1BraggStatus(Device):
|
||||
|
||||
|
||||
class Mo1BraggEncoder(Device):
|
||||
"""Mo1 Bragg PVs to communiucate with the encoder"""
|
||||
"""Mo1 Bragg PVs to communicate with the encoder"""
|
||||
|
||||
enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit.PROC", kind="config")
|
||||
enc_reinit_done = Cpt(EpicsSignalRO, suffix="enc_reinit_done_RBV", kind="config")
|
||||
@@ -192,19 +193,28 @@ class Mo1BraggScanControl(Device):
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScanParameter:
|
||||
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner"""
|
||||
|
||||
scan_time: float = None
|
||||
scan_duration: float = None
|
||||
xrd_enable_low: bool = None
|
||||
xrd_enable_high: bool = None
|
||||
num_trigger_low: int = None
|
||||
num_trigger_high: int = None
|
||||
exp_time_low: float = None
|
||||
exp_time_high: float = None
|
||||
cycle_low: int = None
|
||||
cycle_high: int = None
|
||||
start: float = None
|
||||
end: float = None
|
||||
|
||||
|
||||
class Mo1Bragg(Device, PositionerBase):
|
||||
"""Class for the Mo1 Bragg positioner of the Debye beamline.
|
||||
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: which is connected to
|
||||
the NI motor controller via web sockets.
|
||||
|
||||
Args:
|
||||
prefix (str): EPICS prefix to connect to the soft IOC
|
||||
name (str): Name of the device
|
||||
kind (Kind): Kind of the device
|
||||
device_manager (object): Device manager object to connect to the BEC
|
||||
parent (object): Parent object
|
||||
kwargs: Additional keyword arguments
|
||||
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["set_xtal", "stop_scan"]
|
||||
@@ -234,9 +244,7 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
|
||||
|
||||
# Angle PVs
|
||||
# TODO consider making the angle a pseudo axis of the energy
|
||||
# Or we could even consider making a pseudo positioner for the angle since its less often used and would
|
||||
# be a more elegant solution
|
||||
# TODO makd angle motion a pseudo motor
|
||||
feedback_pos_angle = Cpt(
|
||||
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
@@ -261,24 +269,24 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
def __init__(
|
||||
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
|
||||
):
|
||||
"""Initialize the Mo1 Bragg positioner.
|
||||
|
||||
Args:
|
||||
prefix (str): EPICS prefix for the device
|
||||
name (str): Name of the device
|
||||
kind (Kind): Kind of the device
|
||||
device_manager (DeviceManager): Device manager instance
|
||||
parent (Device): Parent device
|
||||
kwargs: Additional keyword arguments
|
||||
"""
|
||||
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
|
||||
self._stopped = False
|
||||
self.device_manager = device_manager
|
||||
self._move_thread = None
|
||||
self.service_cfg = None
|
||||
self.scaninfo = None
|
||||
self.scan_time = None
|
||||
self.scan_duration = None
|
||||
self.xrd_enable_low = None
|
||||
self.xrd_enable_high = None
|
||||
self.num_trigger_low = None
|
||||
self.num_trigger_high = None
|
||||
self.exp_time_low = None
|
||||
self.exp_time_high = None
|
||||
self.cycle_low = None
|
||||
self.cycle_high = None
|
||||
self.start = None
|
||||
self.end = None
|
||||
# Init scan parameters
|
||||
self.scan_parameter = ScanParameter()
|
||||
|
||||
self.timeout_for_pvwait = 2.5
|
||||
self.readback.name = self.name
|
||||
@@ -304,7 +312,7 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
Args:
|
||||
value (int) : current progress value
|
||||
"""
|
||||
max_value=100
|
||||
max_value = 100
|
||||
logger.info(f"Progress at {value}")
|
||||
self._run_subs(
|
||||
sub_type=self.SUB_PROGRESS,
|
||||
@@ -318,6 +326,31 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
self.scaninfo = bec_scaninfo_mixin.BecScaninfoMixin(self.device_manager)
|
||||
self.scaninfo.load_scan_metadata()
|
||||
|
||||
@property
|
||||
def stopped(self) -> bool:
|
||||
"""Return the stopped flag. If True, the motion is stopped."""
|
||||
return self._stopped
|
||||
|
||||
def stop(self, *, success=False) -> None:
|
||||
"""Stop any motion on the positioner
|
||||
|
||||
Args:
|
||||
success (bool) : Flag to indicate if the motion was successful
|
||||
"""
|
||||
# Stop any motion on the device
|
||||
self.move_stop.put(1)
|
||||
# Stop the move thread
|
||||
self._stopped = True
|
||||
if self._move_thread is not None:
|
||||
self._move_thread.join()
|
||||
self._move_thread = None
|
||||
super().stop(success=success)
|
||||
|
||||
def stop_scan(self) -> None:
|
||||
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
|
||||
self.scan_control.scan_stop.put(1)
|
||||
|
||||
# -------------- Positioner specific methods -----------------#
|
||||
@property
|
||||
def limits(self) -> tuple:
|
||||
"""Return limits of the Bragg positioner"""
|
||||
@@ -349,11 +382,6 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle
|
||||
return move_cpt.get()
|
||||
|
||||
@property
|
||||
def stopped(self) -> bool:
|
||||
"""Return the stopped flag. If True, the motion is stopped."""
|
||||
return self._stopped
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def check_value(self, value: float) -> None:
|
||||
"""Method to check if a value is within limits of the positioner. Called by PositionerBase.move()
|
||||
@@ -366,26 +394,6 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
if low_limit < high_limit and not low_limit <= value <= high_limit:
|
||||
raise LimitError(f"position={value} not within limits {self.limits}")
|
||||
|
||||
def stop(self, *, success=False) -> None:
|
||||
"""Stop any motion on the positioner
|
||||
|
||||
Args:
|
||||
success (bool) : Flag to indicate if the motion was successful
|
||||
"""
|
||||
# Stop any motion on the device
|
||||
self.move_stop.put(1)
|
||||
# Stop the move thread
|
||||
self._stopped = True
|
||||
if self._move_thread is not None:
|
||||
self._move_thread.join()
|
||||
self._move_thread = None
|
||||
super().stop(success=success)
|
||||
|
||||
def stop_scan(self) -> None:
|
||||
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
|
||||
self.scan_control.scan_stop.put(1)
|
||||
self._stopped = True
|
||||
|
||||
def _move_and_finish(
|
||||
self,
|
||||
target_pos: float,
|
||||
@@ -414,11 +422,8 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
|
||||
time.sleep(0.5)
|
||||
while self.motor_is_moving.get() == 0:
|
||||
# TODO check if the _run_subs is needed since we have an auto_monitor on the readback PV
|
||||
# However, since the move_type can change, it might be necessary to have it here
|
||||
val = read_cpt.get()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=val)
|
||||
logger.info(f"Current position of {self.name} is {val}")
|
||||
if self.stopped:
|
||||
success = False
|
||||
break
|
||||
@@ -433,7 +438,6 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
if exception:
|
||||
status.set_exception(exc=exception)
|
||||
else:
|
||||
# The set_finished method does not allow passing success, so we use this one for the moment.
|
||||
# pylint: disable=protected-access
|
||||
status._finished(success=success)
|
||||
|
||||
@@ -463,6 +467,10 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
self._move_thread.start()
|
||||
return status
|
||||
|
||||
# -------------- End of Positioner specific methods -----------------#
|
||||
|
||||
# -------------- MO1 Bragg specific methods -----------------#
|
||||
|
||||
def set_xtal(
|
||||
self,
|
||||
xtal_enum: Literal["111", "311"],
|
||||
@@ -516,18 +524,19 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
self.scan_settings.s_scan_angle_hi.put(high)
|
||||
self.scan_settings.s_scan_scantime.put(scan_time)
|
||||
|
||||
def set_xrd_settings(self,
|
||||
enable_low: bool = False,
|
||||
enable_high: bool = False,
|
||||
num_trigger_low:int=0,
|
||||
num_trigger_high:int=0,
|
||||
exp_time_low:int=0,
|
||||
exp_time_high:int=0,
|
||||
cycle_low:int=0,
|
||||
cycle_high:int=0,
|
||||
) -> None:
|
||||
def set_xrd_settings(
|
||||
self,
|
||||
enable_low: bool,
|
||||
enable_high: bool,
|
||||
num_trigger_low: int,
|
||||
num_trigger_high: int,
|
||||
exp_time_low: int,
|
||||
exp_time_high: int,
|
||||
cycle_low: int,
|
||||
cycle_high: int,
|
||||
) -> None:
|
||||
"""Set XRD settings for the upcoming scan.
|
||||
|
||||
|
||||
Args:
|
||||
enable_low (bool): Enable XRD for low energy/angle
|
||||
enable_high (bool): Enable XRD for high energy/angle
|
||||
@@ -558,44 +567,18 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
self.scan_control.scan_mode_enum.put(val)
|
||||
self.scan_control.scan_duration.put(scan_duration)
|
||||
|
||||
def setup_xas_scan(
|
||||
self,
|
||||
low: float,
|
||||
high: float,
|
||||
mode: Literal["simple", "advanced"],
|
||||
**kwargs,
|
||||
):
|
||||
"""Setup a simple XAS scan for the Bragg positioner.
|
||||
def _update_scan_parameter(self):
|
||||
"""Get the scaninfo parameters for the scan."""
|
||||
for key, value in self.scaninfo.scan_msg.content["info"]["args"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
|
||||
Args:
|
||||
low (float): Low energy/angle value of the scan
|
||||
high (float): High energy/angle value of the scan
|
||||
scan_time (float): Time for a half oscillation
|
||||
mode (Literal["simple", "advanced"]): Pick a mode for the oscillation motion, either simple or advanced (needs to be configured)
|
||||
scan_duration (float): Duration of the scan, if <0.1s the scan will be started in infinite motion mode
|
||||
"""
|
||||
# TODO check if maybe we want an extra argument for infinite or finite motion
|
||||
self.set_xas_settings(low=low, high=high, scan_time=self.scan_time)
|
||||
self.set_xrd_settings(
|
||||
enable_low=self.xrd_enable_low,
|
||||
enable_high=self.xrd_enable_high,
|
||||
num_trigger_low=self.num_trigger_low,
|
||||
num_trigger_high=self.num_trigger_high,
|
||||
exp_time_low=self.exp_time_low,
|
||||
exp_time_high=self.exp_time_high,
|
||||
cycle_low=self.cycle_low,
|
||||
cycle_high=self.cycle_high,
|
||||
)
|
||||
self.set_scan_control_settings(mode=mode, scan_duration=self.scan_duration)
|
||||
self.scan_control.scan_load.put(1)
|
||||
# -------------- End MO1 Bragg specific methods -----------------#
|
||||
|
||||
# -------------- Flyer Interface methods -----------------#
|
||||
|
||||
def kickoff(self):
|
||||
"""Kickoff the device, called from BEC."""
|
||||
# TODO - tobe discussed and decided with Debye team
|
||||
# There are 2 possible ways to start the scan, either infinit or with a timer.
|
||||
# If the scan_duration is less than 0.1s, the scan will be started infinite
|
||||
# This logic should move in future to the controller, but for now it is implemented here
|
||||
# It is necessary since the current NIDAQ implementation relies on it
|
||||
scan_duration = self.scan_control.scan_duration.get()
|
||||
start_func = (
|
||||
self.scan_control.scan_start_infinite.put
|
||||
@@ -624,73 +607,77 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
self.on_stage()
|
||||
return super().stage()
|
||||
|
||||
def _get_scaninfo_parameters(self):
|
||||
"""Get the scaninfo parameters for the scan."""
|
||||
self.start, self.end = self.scaninfo.scan_msg.info["positions"]
|
||||
logger.info(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
pattern = re.compile(r"'scan_time':\s*([\d\.]+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.scan_time = float(pattern_match.group(1)) if pattern_match else 0.0
|
||||
pattern = re.compile(r"'scan_duration':\s*([\d\.]+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.scan_duration = float(pattern_match.group(1)) if pattern_match else 0.0
|
||||
pattern = re.compile(r"'exp_time_low':\s*([\d\.]+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.exp_time_low = float(pattern_match.group(1)) if pattern_match else 0.0
|
||||
pattern = re.compile(r"'exp_time_high':\s*([\d\.]+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.exp_time_high = float(pattern_match.group(1)) if pattern_match else 0.0
|
||||
def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None:
|
||||
"""Check if the scan message is gettting available
|
||||
|
||||
pattern = re.compile(r"'xrd_enable_low':\s*(True|False)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.xrd_enable_low = pattern_match.group(1) =='True' if pattern_match else False
|
||||
pattern = re.compile(r"'xrd_enable_high':\s*(True|False)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.xrd_enable_high = pattern_match.group(1) =='True' if pattern_match else False
|
||||
Args:
|
||||
target_state (ScanControlLoadMessage): Target state to check for
|
||||
|
||||
pattern = re.compile(r"'num_trigger_low':\s*(\d+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.num_trigger_low = int(pattern_match.group(1)) if pattern_match else 0
|
||||
pattern = re.compile(r"'num_trigger_high':\s*(\d+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.num_trigger_high = int(pattern_match.group(1)) if pattern_match else 0
|
||||
pattern = re.compile(r"'cycle_low':\s*(\d+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.cycle_low = int(pattern_match.group(1)) if pattern_match else 0
|
||||
pattern = re.compile(r"'cycle_high':\s*(\d+)")
|
||||
pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0])
|
||||
self.cycle_high = int(pattern_match.group(1)) if pattern_match else 0
|
||||
|
||||
|
||||
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Actions to be executed when the device is staged."""
|
||||
if not self.scaninfo.scan_type == "fly":
|
||||
return
|
||||
|
||||
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
|
||||
Raises:
|
||||
TimeoutError: If the scan message is not available after the timeout
|
||||
"""
|
||||
if self.scan_control.scan_msg.get() != target_state:
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
|
||||
signal_conditions=[(self.scan_control.scan_msg.get, target_state)],
|
||||
timeout=self.timeout_for_pvwait,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise TimeoutError(
|
||||
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}"
|
||||
)
|
||||
# Add here the logic to setup different type of scans
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Actions to be executed when the device is staged."""
|
||||
if not self.scaninfo.scan_type == "fly":
|
||||
return
|
||||
self._check_scan_msg(ScanControlLoadMessage.PENDING)
|
||||
|
||||
scan_name = self.scaninfo.scan_msg.content["info"].get("scan_name", "")
|
||||
if scan_name == "xas_simple_scan" or scan_name == "xas_simple_scan_with_xrd":
|
||||
self._get_scaninfo_parameters()
|
||||
self.setup_xas_scan(
|
||||
low=self.start,
|
||||
high=self.end,
|
||||
mode=ScanControlMode.SIMPLE
|
||||
self._update_scan_parameter()
|
||||
if scan_name == "xas_simple_scan":
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.end,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
)
|
||||
self.set_xrd_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
num_trigger_low=0,
|
||||
num_trigger_high=0,
|
||||
exp_time_low=0,
|
||||
exp_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
elif scan_name == "xas_simple_scan_with_xrd":
|
||||
self.set_xas_settings(
|
||||
low=self.scan_parameter.start,
|
||||
high=self.scan_parameter.end,
|
||||
scan_time=self.scan_parameter.scan_time,
|
||||
)
|
||||
self.set_xrd_settings(
|
||||
enable_low=self.scan_parameter.xrd_enable_low,
|
||||
enable_high=self.scan_parameter.xrd_enable_high,
|
||||
num_trigger_low=self.scan_parameter.num_trigger_low,
|
||||
num_trigger_high=self.scan_parameter.num_trigger_high,
|
||||
exp_time_low=self.scan_parameter.exp_time_low,
|
||||
exp_time_high=self.scan_parameter.exp_time_high,
|
||||
cycle_low=self.scan_parameter.cycle_low,
|
||||
cycle_high=self.scan_parameter.cycle_high,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
|
||||
)
|
||||
else:
|
||||
raise Mo1BraggError(f"Scan mode {scan_name} not implemented for device {self.name}")
|
||||
# Load the scan parameters to the controller
|
||||
self.scan_control.scan_load.put(1)
|
||||
# Wait for params to be checked from controller
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS)],
|
||||
@@ -731,22 +718,18 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
"""
|
||||
self.check_scan_id()
|
||||
self.on_unstage()
|
||||
# TODO should this reset on unstage?
|
||||
self._stopped = False
|
||||
return super().unstage()
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""Actions to be executed when the device is unstaged."""
|
||||
# Reset scan parameter validation
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
|
||||
timeout=self.timeout_for_pvwait,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise TimeoutError(
|
||||
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}"
|
||||
)
|
||||
# Reset scan parameter validation if needed
|
||||
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
|
||||
# -------------- End Flyer Interface methods -----------------#
|
||||
|
||||
# -------------- Utility methods -----------------#
|
||||
|
||||
def check_scan_id(self) -> None:
|
||||
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
|
||||
|
||||
@@ -32,10 +32,10 @@ class XASSimpleScan(AsyncFlyScanBase):
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
""" The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
|
||||
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
|
||||
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
|
||||
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
|
||||
|
||||
|
||||
Args:
|
||||
start (float): Start energy for the scan.
|
||||
stop (float): Stop energy for the scan.
|
||||
@@ -49,7 +49,7 @@ class XASSimpleScan(AsyncFlyScanBase):
|
||||
self.stop = stop
|
||||
self.scan_time = scan_time
|
||||
self.scan_duration = scan_duration
|
||||
self.primary_readout_cycle = 1
|
||||
self.primary_readout_cycle = 1
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan.
|
||||
@@ -61,9 +61,8 @@ class XASSimpleScan(AsyncFlyScanBase):
|
||||
yield None
|
||||
|
||||
def pre_scan(self):
|
||||
"""Pre Scan action. Happens just before the scan.
|
||||
|
||||
Ensure the motor movetype is set to energy, then check limits for start/end energy.
|
||||
"""Pre Scan action. Ensure the motor movetype is set to energy, then check limits for start/end energy.
|
||||
#TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor.
|
||||
"""
|
||||
yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy")
|
||||
|
||||
@@ -79,8 +78,7 @@ class XASSimpleScan(AsyncFlyScanBase):
|
||||
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
|
||||
This is the core of the scan. It is a loop over all positions.
|
||||
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
|
||||
"""
|
||||
# Start the oscillation on the Bragg motor.
|
||||
yield from self.stubs.kickoff(device=self.motor)
|
||||
@@ -101,7 +99,7 @@ class XASSimpleScan(AsyncFlyScanBase):
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
|
||||
self.num_pos = self.point_id +1
|
||||
self.num_pos = self.point_id + 1
|
||||
|
||||
|
||||
class XASSimpleScanWithXRD(XASSimpleScan):
|
||||
@@ -119,40 +117,44 @@ class XASSimpleScanWithXRD(XASSimpleScan):
|
||||
stop: float,
|
||||
scan_time: float,
|
||||
scan_duration: float,
|
||||
xrd_enable_low : bool,
|
||||
num_trigger_low :int,
|
||||
exp_time_low : float,
|
||||
cycle_low : int,
|
||||
xrd_enable_high : bool,
|
||||
num_trigger_high :int,
|
||||
exp_time_high : float,
|
||||
cycle_high : float,
|
||||
xrd_enable_low: bool,
|
||||
num_trigger_low: int,
|
||||
exp_time_low: float,
|
||||
cycle_low: int,
|
||||
xrd_enable_high: bool,
|
||||
num_trigger_high: int,
|
||||
exp_time_high: float,
|
||||
cycle_high: float,
|
||||
motor: DeviceBase = "mo1_bragg",
|
||||
**kwargs,
|
||||
):
|
||||
""" The xas_simple_scan_with_xrd is used to start a simple oscillating scan on the mono bragg motor with XRD triggering.
|
||||
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
|
||||
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
|
||||
xrd_enable_low and xrd_enable_high define if XRD triggering is enabled for the low and high energy range.
|
||||
num_trigger_low and num_trigger_high define the number of triggers for the low and high energy range.
|
||||
exp_time_low and exp_time_high define the exposure time for the low and high energy range.
|
||||
"""The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
|
||||
with XRD triggering at low and high energy ranges.
|
||||
If scan duration is set to 0, the scan will run infinitely.
|
||||
|
||||
Args:
|
||||
start (float): Start energy for the scan.
|
||||
stop (float): Stop energy for the scan.
|
||||
scan_time (float): Time for one scan cycle.
|
||||
scan_duration (float): Duration of the scan.
|
||||
scan_time (float): Time for one oscillation .
|
||||
scan_duration (float): Total duration of the scan.
|
||||
xrd_enable_low (bool): Enable XRD triggering for the low energy range.
|
||||
num_trigger_low (int): Number of triggers for the low energy range.
|
||||
exp_time_low (float): Exposure time for the low energy range.
|
||||
cycle_low (int): Cycle for the low energy range.
|
||||
cycle_low (int): Specify how often the triggers should be considered, every nth cycle for low
|
||||
xrd_enable_high (bool): Enable XRD triggering for the high energy range.
|
||||
num_trigger_high (int): Number of triggers for the high energy range.
|
||||
exp_time_high (float): Exposure time for the high energy range.
|
||||
cycle_high (int): Cycle for the high energy range.
|
||||
cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high
|
||||
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
|
||||
"""
|
||||
super().__init__(start=start, stop=stop, scan_time=scan_time, scan_duration=scan_duration, motor=motor, **kwargs)
|
||||
super().__init__(
|
||||
start=start,
|
||||
stop=stop,
|
||||
scan_time=scan_time,
|
||||
scan_duration=scan_duration,
|
||||
motor=motor,
|
||||
**kwargs,
|
||||
)
|
||||
self.xrd_enable_low = xrd_enable_low
|
||||
self.num_trigger_low = num_trigger_low
|
||||
self.exp_time_low = exp_time_low
|
||||
|
||||
Reference in New Issue
Block a user