fix: refactoring at the beamline

This commit is contained in:
gac-x01da (Resp. Clark Adam Hugh)
2024-07-26 12:02:01 +02:00
committed by appel_c
parent 29076425c4
commit 59675e0038
3 changed files with 80 additions and 47 deletions

View File

@@ -9,7 +9,6 @@ put_complete=True is used to ensure that the action is executed completely. This
to allow for a more stable execution of the action."""
import enum
import re
import threading
import time
import traceback
@@ -208,7 +207,7 @@ class ScanParameter:
cycle_low: int = None
cycle_high: int = None
start: float = None
end: float = None
stop: float = None
class Mo1Bragg(Device, PositionerBase):
@@ -313,7 +312,7 @@ class Mo1Bragg(Device, PositionerBase):
value (int) : current progress value
"""
max_value = 100
logger.info(f"Progress at {value}")
# logger.info(f"Progress at {value}")
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
@@ -569,7 +568,7 @@ class Mo1Bragg(Device, PositionerBase):
def _update_scan_parameter(self):
"""Get the scaninfo parameters for the scan."""
for key, value in self.scaninfo.scan_msg.content["info"]["args"].items():
for key, value in self.scaninfo.scan_msg.content["info"]["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
@@ -616,8 +615,14 @@ class Mo1Bragg(Device, PositionerBase):
Raises:
TimeoutError: If the scan message is not available after the timeout
"""
if self.scan_control.scan_msg.get() != target_state:
state = self.scan_control.scan_msg.get()
if state != target_state:
logger.warning(f"Resetting scan validation in stage for state: {ScanControlLoadMessage(state)}, " \
f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s")
self.scan_control.scan_val_reset.put(1)
# Sleep to ensure the reset is done
time.sleep(1)
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, target_state)],
@@ -625,7 +630,8 @@ class Mo1Bragg(Device, PositionerBase):
check_stopped=True,
):
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}"
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status," \
f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
)
def on_stage(self) -> None:
@@ -639,7 +645,7 @@ class Mo1Bragg(Device, PositionerBase):
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.end,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_xrd_settings(
@@ -658,7 +664,7 @@ class Mo1Bragg(Device, PositionerBase):
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.end,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_xrd_settings(
@@ -685,7 +691,7 @@ class Mo1Bragg(Device, PositionerBase):
check_stopped=True,
):
raise TimeoutError(
f"Scan parameter validation run into timeout after {self.timeout_for_pvwait} with {self.scan_control.scan_status.get()}"
f"Scan parameter validation run into timeout after {self.timeout_for_pvwait} with {ScanControlLoadMessage(self.scan_control.scan_msg.get())}"
)
def complete(self) -> DeviceStatus:
@@ -723,9 +729,24 @@ class Mo1Bragg(Device, PositionerBase):
def on_unstage(self) -> None:
"""Actions to be executed when the device is unstaged."""
# Reset scan parameter validation if needed
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
self.scan_control.scan_val_reset.put(1)
if self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
return
self.scan_control.scan_val_reset.put(1)
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan validation"
)
# -------------- End Flyer Interface methods -----------------#

View File

@@ -1,14 +1,11 @@
""" This module contains the scan classes for the mono bragg motor of the Debye beamline."""
import time
import uuid
import numpy as np
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import AsyncFlyScanBase
from debye_bec.devices.mo1_bragg import MoveType
class XASSimpleScan(AsyncFlyScanBase):
@@ -42,6 +39,8 @@ class XASSimpleScan(AsyncFlyScanBase):
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
super().__init__(**kwargs)
self.motor = motor
@@ -146,7 +145,10 @@ class XASSimpleScanWithXRD(XASSimpleScan):
exp_time_high (float): Exposure time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
"""
Examples:
>>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
super().__init__(
start=start,
stop=stop,

View File

@@ -23,6 +23,14 @@ from debye_bec.devices.mo1_bragg import (
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
from bec_server.scan_server.tests.fixtures import scan_server_mock
from bec_server.scan_server.scan_worker import ScanWorker
@pytest.fixture(scope="function")
def scan_worker_mock(scan_server_mock):
scan_server_mock.device_manager.connector = mock.MagicMock()
scan_worker = ScanWorker(parent=scan_server_mock)
yield scan_worker
@pytest.fixture(scope="function")
@@ -177,9 +185,9 @@ def test_update_scan_parameters(mock_bragg):
scan_id="my_scan_id",
status="closed",
info={
"args": {
"kwargs": {
"start": 0,
"end": 5,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
@@ -199,7 +207,7 @@ def test_update_scan_parameters(mock_bragg):
assert getattr(dev.scan_parameter, field.name) == None
dev._update_scan_parameter()
for field in fields(dev.scan_parameter):
assert getattr(dev.scan_parameter, field.name) == msg.content["info"]["args"].get(
assert getattr(dev.scan_parameter, field.name) == msg.content["info"]["kwargs"].get(
field.name, None
)
@@ -250,27 +258,29 @@ def test_complete(mock_bragg):
def test_unstage(mock_bragg):
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
mock_bragg.unstage()
assert mock_bragg.scan_control.scan_val_reset.get() == 1
mock_bragg.timeout_for_pvwait = 0.5
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
mock_bragg.unstage()
assert mock_bragg.scan_control.scan_val_reset.get() == 0
with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
mock_bragg.unstage()
assert mock_put.call_count == 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with pytest.raises(TimeoutError):
mock_bragg.unstage()
assert mock_put.call_count == 1
def test_stage(mock_bragg):
def test_stage_(mock_bragg):
# Test unknown scan type first
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
info={
"args": {
"kwargs": {
"start": 0,
"end": 5,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
@@ -307,9 +317,9 @@ def test_stage(mock_bragg):
):
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=msg.content["info"]["args"]["start"],
high=msg.content["info"]["args"]["end"],
scan_time=msg.content["info"]["args"]["scan_time"],
low=msg.content["info"]["kwargs"]["start"],
high=msg.content["info"]["kwargs"]["stop"],
scan_time=msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=False,
@@ -323,34 +333,34 @@ def test_stage(mock_bragg):
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=msg.content["info"]["args"]["scan_duration"],
scan_duration=msg.content["info"]["kwargs"]["scan_duration"],
)
mock_bragg.scaninfo.scan_msg.content["info"].update(
{"scan_name": "xas_simple_scan_with_xrd"}
)
# Unstage mock_bragg to reset _staged
mock_bragg.unstage()
time.sleep(0.1)
mock_bragg._staged= ophyd.Staged.no
time.sleep(1)
# Test simple XAS scan with XRD
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=msg.content["info"]["args"]["start"],
high=msg.content["info"]["args"]["end"],
scan_time=msg.content["info"]["args"]["scan_time"],
low=msg.content["info"]["kwargs"]["start"],
high=msg.content["info"]["kwargs"]["stop"],
scan_time=msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=msg.content["info"]["args"]["xrd_enable_low"],
enable_high=msg.content["info"]["args"]["xrd_enable_high"],
num_trigger_low=msg.content["info"]["args"]["num_trigger_low"],
num_trigger_high=msg.content["info"]["args"]["num_trigger_high"],
exp_time_low=msg.content["info"]["args"]["exp_time_low"],
exp_time_high=msg.content["info"]["args"]["exp_time_high"],
cycle_low=msg.content["info"]["args"]["cycle_low"],
cycle_high=msg.content["info"]["args"]["cycle_high"],
enable_low=msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=msg.content["info"]["kwargs"]["xrd_enable_high"],
num_trigger_low=msg.content["info"]["kwargs"]["num_trigger_low"],
num_trigger_high=msg.content["info"]["kwargs"]["num_trigger_high"],
exp_time_low=msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=msg.content["info"]["kwargs"]["exp_time_high"],
cycle_low=msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=msg.content["info"]["args"]["scan_duration"],
scan_duration=msg.content["info"]["kwargs"]["scan_duration"],
)
# Test redundant staging
with pytest.raises(ophyd.utils.errors.RedundantStaging):