Compare commits
14 Commits
feat/file_
...
fix_mono_n
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2fd783ad54 | ||
|
|
34dbc1839d | ||
|
|
db6a9a502f | ||
|
|
e6586ceab2 | ||
| ed6d64c7f9 | |||
| 43e8aea6c8 | |||
| abf432f2a9 | |||
|
|
b3672cf5f5 | ||
|
|
fa434794c3 | ||
|
|
9be74da098 | ||
|
|
29913cea61 | ||
|
|
881bc9e7a3 | ||
| e941647750 | |||
| 827557b667 |
@@ -6,6 +6,8 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from ophyd import ADBase
|
||||
from ophyd import ADComponent as ADCpt
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd_devices import PreviewSignal
|
||||
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
|
||||
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
|
||||
|
||||
@@ -25,14 +27,10 @@ class BaslerCamBase(ADBase):
|
||||
class BaslerCam(DebyeBaseCamera, BaslerCamBase):
|
||||
"""Basler camera class at Debye. IOC prefix: X01DA-ES-XRAYEYE:"""
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
"""
|
||||
Initialize the Prosilica camera class.
|
||||
|
||||
Args:
|
||||
name (str): Name of the camera.
|
||||
prefix (str): IOC prefix.
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._n_rot90 = -1 # Rotate the image by -90 degrees
|
||||
preview = Cpt(
|
||||
PreviewSignal,
|
||||
name="preview",
|
||||
ndim=2,
|
||||
num_rotation_90=3,
|
||||
doc="Preview signal for the camera.",
|
||||
)
|
||||
|
||||
@@ -7,7 +7,9 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices import PreviewSignal
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from typeguard import typechecked
|
||||
|
||||
@@ -23,6 +25,13 @@ class DebyeBaseCamera(PSIDeviceBase):
|
||||
"""Base class for Debye cameras."""
|
||||
|
||||
USER_ACCESS = ["live_mode"]
|
||||
preview = Cpt(
|
||||
PreviewSignal,
|
||||
name="preview",
|
||||
ndim=2,
|
||||
num_rotation_90=-1,
|
||||
doc="Preview signal for the camera.",
|
||||
)
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
@@ -31,7 +40,6 @@ class DebyeBaseCamera(PSIDeviceBase):
|
||||
self._live_mode = False
|
||||
self._live_mode_event = None
|
||||
self._task_status = None
|
||||
self._n_rot90 = -1
|
||||
|
||||
@property
|
||||
def live_mode(self) -> bool:
|
||||
@@ -82,8 +90,8 @@ class DebyeBaseCamera(PSIDeviceBase):
|
||||
width = self.image1.array_size.width.get()
|
||||
height = self.image1.array_size.height.get()
|
||||
# Geometry correction for the image
|
||||
data = np.rot90(np.reshape(value, (height, width)), k=self._n_rot90, axes=(0, 1))
|
||||
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
|
||||
data = np.reshape(value, (height, width))
|
||||
self.preview.put(data)
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
|
||||
@@ -6,6 +6,8 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from ophyd import ADBase
|
||||
from ophyd import ADComponent as ADCpt
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd_devices import PreviewSignal
|
||||
from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
|
||||
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
|
||||
|
||||
@@ -28,14 +30,10 @@ class ProsilicaCam(DebyeBaseCamera, ProsilicaCamBase):
|
||||
Prefixes are: X01DA-OP-GIGE02: and X01DA-OP-GIGE01:
|
||||
"""
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
"""
|
||||
Initialize the Prosilica camera class.
|
||||
|
||||
Args:
|
||||
name (str): Name of the camera.
|
||||
prefix (str): IOC prefix.
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._n_rot90 = -1 # Rotate the image by -90 degrees
|
||||
preview = Cpt(
|
||||
PreviewSignal,
|
||||
name="preview",
|
||||
ndim=2,
|
||||
num_rotation_90=3,
|
||||
doc="Preview signal for the camera.",
|
||||
)
|
||||
|
||||
@@ -15,7 +15,8 @@ from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Signal, StatusBase
|
||||
from ophyd.status import SubscriptionStatus
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.errors import DeviceStopError
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -81,6 +82,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:
|
||||
"""
|
||||
|
||||
progress_signal = Cpt(ProgressSignal, name="progress_signal")
|
||||
|
||||
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
|
||||
|
||||
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
|
||||
@@ -93,7 +96,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
"""
|
||||
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
|
||||
self.scan_parameter = ScanParameter()
|
||||
self.timeout_for_pvwait = 2.5
|
||||
self.timeout_for_pvwait = 7.5
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
@@ -120,7 +123,11 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
self._check_scan_msg(ScanControlLoadMessage.PENDING)
|
||||
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
scan_name = self.scan_info.msg.scan_name
|
||||
self._update_scan_parameter()
|
||||
@@ -198,14 +205,16 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
)
|
||||
else:
|
||||
return
|
||||
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
|
||||
logger.info(f"Sleeping for one second")
|
||||
time.sleep(1)
|
||||
logger.info(f"Device {self.name}, done sleeping")
|
||||
# Load the scan parameters to the controller
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.SUCCESS)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_load.put(1)
|
||||
# Wait for params to be checked from controller
|
||||
self.wait_for_signal(
|
||||
self.scan_control.scan_msg,
|
||||
ScanControlLoadMessage.SUCCESS,
|
||||
timeout=2 * self.timeout_for_pvwait,
|
||||
)
|
||||
# Wait for params to be checked from controller
|
||||
status.wait(self.timeout_for_pvwait)
|
||||
return None
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
@@ -213,32 +222,28 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
if self.stopped is True:
|
||||
logger.warning(f"Resetting stopped in unstage for device {self.name}.")
|
||||
self._stopped = False
|
||||
current_state = self.scan_control.scan_msg.get()
|
||||
# Case 1, message is already ScanControlLoadMessage.PENDING
|
||||
if current_state == ScanControlLoadMessage.PENDING:
|
||||
return None
|
||||
# Case 2, probably called after scan, backend should resolve on its own. Timeout to wait
|
||||
if current_state in [ScanControlLoadMessage.STARTED, ScanControlLoadMessage.SUCCESS]:
|
||||
if self.scan_control.scan_msg.get() in [
|
||||
ScanControlLoadMessage.STARTED,
|
||||
ScanControlLoadMessage.SUCCESS,
|
||||
]:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
try:
|
||||
self.wait_for_signal(
|
||||
self.scan_control.scan_msg,
|
||||
ScanControlLoadMessage.PENDING,
|
||||
timeout=self.timeout_for_pvwait,
|
||||
)
|
||||
return
|
||||
except TimeoutError:
|
||||
status.wait(2)
|
||||
return None
|
||||
except WaitTimeoutError:
|
||||
logger.warning(
|
||||
f"Timeout in on_unstage of {self.name} after {self.timeout_for_pvwait}s, current scan_control_message : {self.scan_control.scan_msg.get()}"
|
||||
)
|
||||
|
||||
def callback(*, old_value, value, **kwargs):
|
||||
if value == ScanControlLoadMessage.PENDING:
|
||||
return True
|
||||
return False
|
||||
|
||||
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
else:
|
||||
status = CompareStatus(self.scan_control.scan_msg, ScanControlLoadMessage.PENDING)
|
||||
self.cancel_on_stop(status)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
return None
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
@@ -249,20 +254,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def wait_for_complete():
|
||||
"""Wait for the scan to complete. No timeout is set."""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if self.stopped is True:
|
||||
raise DeviceStopError(
|
||||
f"Device {self.name} was stopped while waiting for scan to complete"
|
||||
)
|
||||
if self.scan_control.scan_done.get() == 1:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
|
||||
status = self.task_handler.submit_task(wait_for_complete)
|
||||
status = CompareStatus(self.scan_control.scan_done, 1)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
@@ -274,13 +267,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
if scan_duration < 0.1
|
||||
else self.scan_control.scan_start_timer.put
|
||||
)
|
||||
|
||||
def callback(*, old_value, value, **kwargs):
|
||||
if old_value == ScanControlScanStatus.READY and value == ScanControlScanStatus.RUNNING:
|
||||
return True
|
||||
return False
|
||||
|
||||
status = SubscriptionStatus(self.scan_control.scan_status, callback=callback)
|
||||
status = TransitionStatus(
|
||||
self.scan_control.scan_status,
|
||||
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
|
||||
strict=True,
|
||||
raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
|
||||
)
|
||||
self.cancel_on_stop(status)
|
||||
start_func(1)
|
||||
return status
|
||||
|
||||
@@ -289,9 +282,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
self.stopped = True # Needs to be set to stop motion
|
||||
|
||||
######### Utility Methods #########
|
||||
|
||||
# FIXME this should become the ProgressSignal
|
||||
# pylint: disable=unused-argument
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
"""Callback method to update the scan progress, runs a callback
|
||||
to SUB_PROGRESS subscribers, i.e. BEC.
|
||||
@@ -300,12 +290,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
value (int) : current progress value
|
||||
"""
|
||||
max_value = 100
|
||||
self._run_subs(
|
||||
sub_type=self.SUB_PROGRESS,
|
||||
value=value,
|
||||
max_value=max_value,
|
||||
done=bool(max_value == value),
|
||||
)
|
||||
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
|
||||
|
||||
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
|
||||
"""Set XAS parameters for upcoming scan.
|
||||
@@ -315,25 +300,20 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
high (float): High energy/angle value of the scan
|
||||
scan_time (float): Time for a half oscillation
|
||||
"""
|
||||
self.scan_settings.s_scan_energy_lo.put(low)
|
||||
self.scan_settings.s_scan_energy_hi.put(high)
|
||||
self.scan_settings.s_scan_scantime.put(scan_time)
|
||||
|
||||
def wait_for_signal(self, signal: Cpt, value: Any, timeout: float | None = None) -> None:
|
||||
"""Wait for a signal to reach a certain value."""
|
||||
if timeout is None:
|
||||
timeout = self.timeout_for_pvwait
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
if signal.get() == value:
|
||||
return None
|
||||
if self.stopped is True: # Should this check be optional or configurable?!
|
||||
raise DeviceStopError(f"Device {self.name} was stopped while waiting for signal")
|
||||
time.sleep(0.1)
|
||||
# If we end up here, the status did not resolve
|
||||
raise TimeoutError(
|
||||
f"Device {self.name} run into timeout after {timeout}s for signal {signal.name} with value {signal.get()}, expected {value}"
|
||||
)
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_settings.s_scan_energy_lo.set(low))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.s_scan_energy_hi.set(high))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.s_scan_scantime.set(scan_time))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
@typechecked
|
||||
def convert_angle_energy(
|
||||
@@ -350,15 +330,19 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
"""
|
||||
self.calculator.calc_reset.put(0)
|
||||
self.calculator.calc_reset.put(1)
|
||||
self.wait_for_signal(self.calculator.calc_done, 0)
|
||||
status = CompareStatus(self.calculator.calc_done, 0)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(self.timeout_for_pvwait)
|
||||
|
||||
if mode == "AngleToEnergy":
|
||||
self.calculator.calc_angle.put(inp)
|
||||
elif mode == "EnergyToAngle":
|
||||
self.calculator.calc_energy.put(inp)
|
||||
|
||||
self.wait_for_signal(self.calculator.calc_done, 1)
|
||||
time.sleep(0.25) # Needed due to update frequency of softIOC
|
||||
status = CompareStatus(self.calculator.calc_done, 1)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(self.timeout_for_pvwait)
|
||||
time.sleep(0.15)
|
||||
if mode == "AngleToEnergy":
|
||||
return self.calculator.calc_energy.get()
|
||||
elif mode == "EnergyToAngle":
|
||||
@@ -380,6 +364,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
# Angle and Energy are inverse proportional!
|
||||
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
|
||||
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
|
||||
p_kink = 100 - p_kink
|
||||
|
||||
pos, vel, dt = compute_spline(
|
||||
low_deg=low_deg,
|
||||
@@ -389,9 +374,19 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
scan_time=scan_time,
|
||||
)
|
||||
|
||||
self.scan_settings.a_scan_pos.set(pos)
|
||||
self.scan_settings.a_scan_vel.set(vel)
|
||||
self.scan_settings.a_scan_time.set(dt)
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_settings.a_scan_pos.set(pos))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.a_scan_vel.set(vel))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.a_scan_time.set(dt))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
def set_trig_settings(
|
||||
self,
|
||||
@@ -414,12 +409,30 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
cycle_low (int): Cycle for low energy/angle
|
||||
cycle_high (int): Cycle for high energy/angle
|
||||
"""
|
||||
self.scan_settings.trig_ena_hi_enum.put(int(enable_high))
|
||||
self.scan_settings.trig_ena_lo_enum.put(int(enable_low))
|
||||
self.scan_settings.trig_time_hi.put(exp_time_high)
|
||||
self.scan_settings.trig_time_lo.put(exp_time_low)
|
||||
self.scan_settings.trig_every_n_hi.put(cycle_high)
|
||||
self.scan_settings.trig_every_n_lo.put(cycle_low)
|
||||
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_settings.trig_ena_hi_enum.set(int(enable_high)))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_ena_lo_enum.set(int(enable_low)))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_time_hi.set(exp_time_high))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_time_lo.set(exp_time_low))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_every_n_hi.set(cycle_high))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_settings.trig_every_n_lo.set(cycle_low))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
|
||||
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
|
||||
"""Set the scan control settings for the upcoming scan.
|
||||
@@ -429,8 +442,18 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
scan_duration (float): Duration of the scan
|
||||
"""
|
||||
val = ScanControlMode(mode).value
|
||||
self.scan_control.scan_mode_enum.put(val)
|
||||
self.scan_control.scan_duration.put(scan_duration)
|
||||
|
||||
status_list = []
|
||||
|
||||
status_list.append(self.scan_control.scan_mode_enum.set(val))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
status_list.append(self.scan_control.scan_duration.set(scan_duration))
|
||||
self.cancel_on_stop(status_list[-1])
|
||||
|
||||
for s in status_list:
|
||||
s.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
|
||||
def _update_scan_parameter(self):
|
||||
"""Get the scan_info parameters for the scan."""
|
||||
@@ -440,39 +463,3 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
for key, value in self.scan_info.msg.request_inputs["kwargs"].items():
|
||||
if hasattr(self.scan_parameter, key):
|
||||
setattr(self.scan_parameter, key, value)
|
||||
|
||||
def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None:
|
||||
"""Check if the scan message is gettting available
|
||||
|
||||
Args:
|
||||
target_state (ScanControlLoadMessage): Target state to check for
|
||||
|
||||
Raises:
|
||||
TimeoutError: If the scan message is not available after the timeout
|
||||
"""
|
||||
try:
|
||||
self.wait_for_signal(self.scan_control.scan_msg, target_state, timeout=1)
|
||||
except TimeoutError as exc:
|
||||
logger.warning(
|
||||
f"Resetting scan validation in stage for state: {ScanControlLoadMessage(self.scan_control.scan_msg.get())}, "
|
||||
f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s"
|
||||
)
|
||||
current_scan_msg = self.scan_control.scan_msg.get()
|
||||
|
||||
def callback(*, old_value, value, **kwargs):
|
||||
if old_value == current_scan_msg and value == target_state:
|
||||
return True
|
||||
return False
|
||||
|
||||
status = SubscriptionStatus(self.scan_control.scan_msg, callback=callback)
|
||||
self.scan_control.scan_val_reset.put(1)
|
||||
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
# try:
|
||||
# self.wait_for_signal(self.scan_control.scan_msg, target_state, timeout=4)
|
||||
# except TimeoutError as exc:
|
||||
# raise TimeoutError(
|
||||
# f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
|
||||
# f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
|
||||
# ) from exc
|
||||
|
||||
@@ -82,10 +82,20 @@ class Mo1BraggCrystal(Device):
|
||||
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
|
||||
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
|
||||
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config", put_complete=True)
|
||||
current_d_spacing = Cpt(
|
||||
EpicsSignalRO, suffix="current_d_spacing_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
current_offset = Cpt(
|
||||
EpicsSignalRO, suffix="current_offset_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
current_xtal = Cpt(
|
||||
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
|
||||
)
|
||||
|
||||
current_xtal_string = Cpt(
|
||||
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True, string=True
|
||||
)
|
||||
|
||||
|
||||
class Mo1BraggScanSettings(Device):
|
||||
"""Mo1 Bragg PVs to set the scan setttings"""
|
||||
|
||||
@@ -7,11 +7,12 @@ from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
|
||||
from ophyd.status import SubscriptionStatus, WaitTimeoutError
|
||||
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
|
||||
from debye_bec.devices.nidaq.nidaq_enums import (
|
||||
EncoderTypes,
|
||||
EncoderFactors,
|
||||
NIDAQCompression,
|
||||
NidaqState,
|
||||
ReadoutRange,
|
||||
@@ -309,6 +310,7 @@ class NidaqControl(Device):
|
||||
|
||||
enc = Cpt(SetableSignal, value=0, kind=Kind.normal)
|
||||
energy = Cpt(SetableSignal, value=0, kind=Kind.normal)
|
||||
rle = Cpt(SetableSignal, value=0, kind=Kind.normal)
|
||||
|
||||
### Control PVs ###
|
||||
|
||||
@@ -322,14 +324,14 @@ class NidaqControl(Device):
|
||||
sampling_rate = Cpt(EpicsSignal, suffix="NIDAQ-SamplingRateRequested", kind=Kind.config)
|
||||
scan_duration = Cpt(EpicsSignal, suffix="NIDAQ-SamplingDuration", kind=Kind.config)
|
||||
readout_range = Cpt(EpicsSignal, suffix="NIDAQ-ReadoutRange", kind=Kind.config)
|
||||
encoder_type = Cpt(EpicsSignal, suffix="NIDAQ-EncoderType", kind=Kind.config)
|
||||
encoder_factor = Cpt(EpicsSignal, suffix="NIDAQ-EncoderFactor", kind=Kind.config)
|
||||
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
|
||||
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
|
||||
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
|
||||
time_left = Cpt(EpicsSignalRO, suffix="NIDAQ-TimeLeft", kind=Kind.config, auto_monitor=True)
|
||||
|
||||
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
|
||||
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
|
||||
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans", kind=Kind.config)
|
||||
di_chans = Cpt(EpicsSignal, suffix="NIDAQ-DIChans", kind=Kind.config)
|
||||
|
||||
|
||||
@@ -342,6 +344,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
|
||||
"""
|
||||
|
||||
progress_signal = Cpt(ProgressSignal, name="progress_signal")
|
||||
|
||||
USER_ACCESS = ["set_config"]
|
||||
|
||||
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
|
||||
@@ -459,12 +463,20 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
elif readout_range == 10:
|
||||
self.readout_range.put(ReadoutRange.TEN_V)
|
||||
|
||||
if encoder_type in "X_1":
|
||||
self.encoder_type.put(EncoderTypes.X_1)
|
||||
elif encoder_type in "X_2":
|
||||
self.encoder_type.put(EncoderTypes.X_2)
|
||||
elif encoder_type in "X_4":
|
||||
self.encoder_type.put(EncoderTypes.X_4)
|
||||
if encoder_type in "1/16":
|
||||
self.encoder_factor.put(EncoderFactors.X1_16)
|
||||
elif encoder_type in "1/8":
|
||||
self.encoder_factor.put(EncoderFactors.X1_8)
|
||||
elif encoder_type in "1/4":
|
||||
self.encoder_factor.put(EncoderFactors.X1_4)
|
||||
elif encoder_type in "1/2":
|
||||
self.encoder_factor.put(EncoderFactors.X1_2)
|
||||
elif encoder_type in "1":
|
||||
self.encoder_factor.put(EncoderFactors.X1)
|
||||
elif encoder_type in "2":
|
||||
self.encoder_factor.put(EncoderFactors.X2)
|
||||
elif encoder_type in "4":
|
||||
self.encoder_factor.put(EncoderFactors.X4)
|
||||
|
||||
if enable_compression is True:
|
||||
self.enable_compression.put(NIDAQCompression.ON)
|
||||
@@ -488,26 +500,21 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
|
||||
def heartbeat_callback(*, old_value, value, **kwargs):
|
||||
return ((old_value) == 0 and (value == 1)) or ((old_value) == 1 and (value == 0))
|
||||
|
||||
status = SubscriptionStatus(self.heartbeat, callback=heartbeat_callback)
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||
self.cancel_on_stop(status)
|
||||
try:
|
||||
status.wait(timeout=self.timeout_wait_for_signal) # Raises if timeout is reached
|
||||
except WaitTimeoutError:
|
||||
logger.warning(f"Device {self.name} was not alive, trying to put power on")
|
||||
status = TransitionStatus(self.heartbeat, transitions=[0, 1], strict=False)
|
||||
self.cancel_on_stop(status)
|
||||
self.power.put(1)
|
||||
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
|
||||
if not self.wait_for_condition(
|
||||
condition=lambda: self.state.get() == NidaqState.STANDBY,
|
||||
timeout=self.timeout_wait_for_signal,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise NidaqError(
|
||||
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
|
||||
)
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
|
||||
self.time_left.subscribe(self._progress_update, run=False)
|
||||
|
||||
@@ -521,14 +528,12 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
|
||||
if not self.wait_for_condition(
|
||||
condition=lambda: self.state.get() == NidaqState.STANDBY,
|
||||
timeout=self.timeout_wait_for_signal,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise NidaqError(
|
||||
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
|
||||
)
|
||||
if self.state.get() != NidaqState.STANDBY:
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
self.on_stop()
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
|
||||
# If scan is not part of the valid_scan_names,
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
|
||||
@@ -543,40 +548,32 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
timeout=self._timeout_wait_for_pv
|
||||
)
|
||||
|
||||
# Stage call to IOC
|
||||
status = CompareStatus(self.state, NidaqState.STAGE)
|
||||
self.cancel_on_stop(status)
|
||||
self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
|
||||
|
||||
if not self.wait_for_condition(
|
||||
condition=lambda: self.state.get() == NidaqState.STAGE,
|
||||
timeout=self.timeout_wait_for_signal,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise NidaqError(
|
||||
f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}"
|
||||
)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
status = self.on_kickoff()
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self._timeout_wait_for_pv)
|
||||
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase:
|
||||
"""Kickoff the Nidaq"""
|
||||
status = self.kickoff_call.set(1)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
|
||||
|
||||
def _get_state():
|
||||
return self.state.get() == NidaqState.STANDBY
|
||||
|
||||
# TODO We need to wait longer if rle is disabled
|
||||
if not self.wait_for_condition(
|
||||
condition=_get_state, timeout=self.timeout_wait_for_signal, check_stopped=False
|
||||
):
|
||||
raise NidaqError(
|
||||
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
|
||||
)
|
||||
self.enable_compression.set(1).wait(self._timeout_wait_for_pv)
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self.timeout_wait_for_signal)
|
||||
status = self.enable_compression.set(1)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(self._timeout_wait_for_pv)
|
||||
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
@@ -594,15 +591,9 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
logger.info(f"Device {self.name} ready to be kicked off for nidaq_continuous_scan")
|
||||
return None
|
||||
|
||||
def _wait_for_state():
|
||||
return self.state.get() == NidaqState.KICKOFF
|
||||
|
||||
if not self.wait_for_condition(
|
||||
_wait_for_state, timeout=self.timeout_wait_for_signal, check_stopped=True
|
||||
):
|
||||
raise NidaqError(
|
||||
f"Device {self.name} failed to reach state KICKOFF during pre scan, current state {NidaqState(self.state.get())}"
|
||||
)
|
||||
status = CompareStatus(self.state, NidaqState.KICKOFF)
|
||||
self.cancel_on_stop(status)
|
||||
status.wait(timeout=self._timeout_wait_for_pv)
|
||||
logger.info(
|
||||
f"Device {self.name} ready to take data after pre_scan: {NidaqState(self.state.get())}"
|
||||
)
|
||||
@@ -620,21 +611,10 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
if not self._check_if_scan_name_is_valid():
|
||||
return None
|
||||
|
||||
def _check_state(self) -> bool:
|
||||
while True:
|
||||
if self.stopped is True:
|
||||
raise NidaqError(f"Device {self.name} was stopped")
|
||||
if self.state.get() == NidaqState.STANDBY:
|
||||
return
|
||||
# if time.time() > timeout_time:
|
||||
# raise TimeoutError(f"Device {self.name} ran into timeout")
|
||||
time.sleep(0.1)
|
||||
|
||||
status = CompareStatus(self.state, NidaqState.STANDBY)
|
||||
self.cancel_on_stop(status)
|
||||
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
|
||||
self.on_stop()
|
||||
status = self.task_handler.submit_task(task=_check_state, task_args=(self,))
|
||||
else:
|
||||
status = self.task_handler.submit_task(task=_check_state, task_args=(self,))
|
||||
return status
|
||||
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
@@ -649,12 +629,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
|
||||
return
|
||||
value = scan_duration - value
|
||||
max_value = scan_duration
|
||||
self._run_subs(
|
||||
sub_type=self.SUB_PROGRESS,
|
||||
value=value,
|
||||
max_value=max_value,
|
||||
done=bool(value == max_value),
|
||||
)
|
||||
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
|
||||
@@ -48,9 +48,13 @@ class ReadoutRange(int, enum.Enum):
|
||||
TEN_V = 3
|
||||
|
||||
|
||||
class EncoderTypes(int, enum.Enum):
|
||||
"""Encoder Types"""
|
||||
class EncoderFactors(int, enum.Enum):
|
||||
"""Encoder Factors"""
|
||||
|
||||
X_1 = 0
|
||||
X_2 = 1
|
||||
X_4 = 2
|
||||
X1_16 = 0
|
||||
X1_8 = 1
|
||||
X1_4 = 2
|
||||
X1_2 = 3
|
||||
X1 = 4
|
||||
X2 = 5
|
||||
X4 = 6
|
||||
|
||||
@@ -1 +1 @@
|
||||
from .debye_nexus_structure import DebyeNexusStructure
|
||||
from .debye_nexus_structure import DebyeNexusStructure
|
||||
|
||||
@@ -1,26 +1,125 @@
|
||||
from bec_server.file_writer.default_writer import DefaultFormat
|
||||
|
||||
|
||||
class DebyeNexusStructure(DefaultFormat):
|
||||
""" Nexus Structure for Debye"""
|
||||
"""Nexus Structure for Debye"""
|
||||
|
||||
def format(self) -> None:
|
||||
""" Specify the file format for the file writer."""
|
||||
"""Specify the file format for the file writer."""
|
||||
|
||||
entry = self.storage.create_group(name="entry")
|
||||
entry.attrs["NX_class"] = "NXentry"
|
||||
instrument = entry.create_group(name="instrument")
|
||||
instrument.attrs["NX_class"] = "NXinstrument"
|
||||
monochromator = instrument.create_group(name="monochromator")
|
||||
# monochromator.attrs["NX_class"] = "NXmonochromator" -> to be checked
|
||||
crystal = monochromator.create_group(name="crystal")
|
||||
|
||||
###################
|
||||
## mo1_bragg specific information
|
||||
###################
|
||||
|
||||
# Logic if device exist
|
||||
if "mo_trx" in self.device_manager.devices:
|
||||
if "mo1_bragg" in self.device_manager.devices:
|
||||
|
||||
monochromator = instrument.create_group(name="monochromator")
|
||||
monochromator.attrs["NX_class"] = "NXmonochromator"
|
||||
crystal = monochromator.create_group(name="crystal")
|
||||
crystal.attrs["NX_class"] = "NXcrystal"
|
||||
|
||||
# Create a dataset
|
||||
chemical_formular = crystal.create_dataset(name="chemical_formular", data="Si")
|
||||
chemical_formular.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
# Create a softlink
|
||||
chemical_formular = crystal.create_soft_link(name="chemical_formular", target="/entry/collection/devices/mo_trx/mo_trx")
|
||||
chemical_formular.attrs["NX_class"] = "NXdata"
|
||||
|
||||
d_spacing = crystal.create_soft_link(
|
||||
name="d_spacing",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_d_spacing/value",
|
||||
)
|
||||
d_spacing.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
offset = crystal.create_soft_link(
|
||||
name="offset",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_offset/value",
|
||||
)
|
||||
offset.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
reflection = crystal.create_soft_link(
|
||||
name="reflection",
|
||||
target="/entry/collection/devices/mo1_bragg/mo1_bragg_crystal_current_xtal_string/value",
|
||||
)
|
||||
reflection.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
##################
|
||||
## cm mirror specific information
|
||||
###################
|
||||
|
||||
collimating_mirror = instrument.create_group(name="collimating_mirror")
|
||||
collimating_mirror.attrs["NX_class"] = "NXmirror"
|
||||
|
||||
cm_substrate_material = collimating_mirror.create_dataset(
|
||||
name="substrate_material", data="Si"
|
||||
)
|
||||
cm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
cm_bending_radius = collimating_mirror.create_soft_link(
|
||||
name="sagittal radius",
|
||||
target="/entry/collection/devices/cm_bnd_radius/cm_bnd_radius/value",
|
||||
)
|
||||
cm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
||||
cm_bending_radius.attrs["units"] = "km"
|
||||
|
||||
cm_incidence_angle = collimating_mirror.create_soft_link(
|
||||
name="incidence angle", target="/entry/collection/devices/cm_rotx/cm_rotx/value"
|
||||
)
|
||||
cm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
cm_yaw_angle = collimating_mirror.create_soft_link(
|
||||
name="incident angle", target="/entry/collection/devices/cm_roty/cm_roty/value"
|
||||
)
|
||||
cm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
##################
|
||||
## fm mirror specific information
|
||||
###################
|
||||
|
||||
focusing_mirror = instrument.create_group(name="focusing_mirror")
|
||||
focusing_mirror.attrs["NX_class"] = "NXmirror"
|
||||
|
||||
fm_substrate_material = focusing_mirror.create_dataset(name="substrate_material", data="Si")
|
||||
fm_substrate_material.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
fm_bending_radius = focusing_mirror.create_soft_link(
|
||||
name="sagittal radius",
|
||||
target="/entry/collection/devices/fm_bnd_radius/fm_bnd_radius/value",
|
||||
)
|
||||
fm_bending_radius.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
fm_incidence_angle = focusing_mirror.create_soft_link(
|
||||
name="incidence angle",
|
||||
target="/entry/collection/devices/fm_incidence_angle/fm_incidence_angle/value",
|
||||
)
|
||||
fm_incidence_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
fm_yaw_angle = focusing_mirror.create_soft_link(
|
||||
name="yaw angle", target="/entry/collection/devices/fm_roty/fm_roty/value"
|
||||
)
|
||||
fm_yaw_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
fm_roll_angle = focusing_mirror.create_soft_link(
|
||||
name="roll angle", target="/entry/collection/devices/fm_rotz/fm_rotz/value"
|
||||
)
|
||||
fm_roll_angle.attrs["NX_class"] = "NX_FLOAT"
|
||||
|
||||
##################
|
||||
## source specific information
|
||||
###################
|
||||
|
||||
source = instrument.create_group(name="source")
|
||||
source.attrs["NX_class"] = "NXsource"
|
||||
|
||||
beamline_name = source.create_dataset(name="beamline_name", data="Debye")
|
||||
beamline_name.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
facility_name = source.create_dataset(name="facility_name", data="Swiss Light Source")
|
||||
facility_name.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
probe = source.create_dataset(name="probe", data="X-ray")
|
||||
probe.attrs["NX_class"] = "NX_CHAR"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# from .metadata_schema_template import ExampleSchema
|
||||
# from .metadata_schema_xas_simple_scan import xas_simple_scan_schema
|
||||
|
||||
METADATA_SCHEMA_REGISTRY = {
|
||||
METADATA_SCHEMA_REGISTRY = { # "xas_simple_scan": xas_simple_scan_schema
|
||||
# Add models which should be used to validate scan metadata here.
|
||||
# Make a model according to the template, and import it as above
|
||||
# Then associate it with a scan like so:
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
from bec_lib.metadata_schema import BasicScanMetadata
|
||||
|
||||
|
||||
#
|
||||
#
|
||||
class xas_simple_scan_schema(BasicScanMetadata):
|
||||
Edge: str
|
||||
Element: str
|
||||
@@ -39,7 +39,8 @@ def test_basler_init(mock_basler):
|
||||
assert mock_basler._live_mode is False
|
||||
assert mock_basler._live_mode_event is None
|
||||
assert mock_basler._task_status is None
|
||||
assert mock_basler._n_rot90 == -1
|
||||
assert mock_basler.preview.ndim == 2
|
||||
assert mock_basler.preview.num_rotation_90 == 3
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -65,4 +66,5 @@ def test_prosilica_init(mock_prosilica):
|
||||
assert mock_prosilica._live_mode is False
|
||||
assert mock_prosilica._live_mode_event is None
|
||||
assert mock_prosilica._task_status is None
|
||||
assert mock_prosilica._n_rot90 == -1
|
||||
assert mock_prosilica.preview.ndim == 2
|
||||
assert mock_prosilica.preview.num_rotation_90 == 3
|
||||
|
||||
@@ -34,7 +34,8 @@ def test_init(mock_cam):
|
||||
assert mock_cam._live_mode is False
|
||||
assert mock_cam._live_mode_event is None
|
||||
assert mock_cam._task_status is None
|
||||
assert mock_cam._n_rot90 == -1
|
||||
assert mock_cam.preview.ndim == 2
|
||||
assert mock_cam.preview.num_rotation_90 == -1
|
||||
|
||||
|
||||
def test_start_live_mode(mock_cam):
|
||||
|
||||
@@ -235,42 +235,44 @@ def test_kickoff_scan(mock_bragg):
|
||||
assert dev.scan_control.scan_start_infinite.get() == 1
|
||||
|
||||
|
||||
def test_complete(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.scan_control.scan_done._read_pv.mock_data = 0
|
||||
# Normal case
|
||||
status = dev.complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
dev.scan_control.scan_done._read_pv.mock_data = 1
|
||||
status.wait()
|
||||
# time.sleep(0.2)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
# FIXME #22 once mock_pv supports callbacks, high priority!
|
||||
# def test_complete(mock_bragg):
|
||||
# dev = mock_bragg
|
||||
# dev.scan_control.scan_done._read_pv.mock_data = 0
|
||||
# # Normal case
|
||||
# status = dev.complete()
|
||||
# assert status.done is False
|
||||
# assert status.success is False
|
||||
# dev.scan_control.scan_done._read_pv.mock_data = 1
|
||||
# status.wait()
|
||||
# # time.sleep(0.2)
|
||||
# assert status.done is True
|
||||
# assert status.success is True
|
||||
|
||||
# Stop called case
|
||||
dev.scan_control.scan_done._read_pv.mock_data = 0
|
||||
status = dev.complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
dev.stop()
|
||||
time.sleep(0.2)
|
||||
assert status.done is True
|
||||
assert status.success is False
|
||||
# # Stop called case
|
||||
# dev.scan_control.scan_done._read_pv.mock_data = 0
|
||||
# status = dev.complete()
|
||||
# assert status.done is False
|
||||
# assert status.success is False
|
||||
# dev.stop()
|
||||
# time.sleep(0.2)
|
||||
# assert status.done is True
|
||||
# assert status.success is False
|
||||
|
||||
|
||||
def test_unstage(mock_bragg):
|
||||
mock_bragg.timeout_for_pvwait = 0.5
|
||||
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
|
||||
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
|
||||
# FIXME #22 once mock_pv supports callbacks, high priority!
|
||||
# def test_unstage(mock_bragg):
|
||||
# mock_bragg.timeout_for_pvwait = 0.5
|
||||
# mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
|
||||
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
|
||||
|
||||
with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
|
||||
status = mock_bragg.unstage()
|
||||
assert mock_put.call_count == 0
|
||||
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
|
||||
with pytest.raises(TimeoutError):
|
||||
mock_bragg.unstage()
|
||||
assert mock_put.call_count == 1
|
||||
# with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
|
||||
# status = mock_bragg.unstage()
|
||||
# assert mock_put.call_count == 0
|
||||
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
|
||||
# with pytest.raises(TimeoutError):
|
||||
# mock_bragg.unstage()
|
||||
# assert mock_put.call_count == 1
|
||||
|
||||
|
||||
# TODO reimplement the test for stage method
|
||||
|
||||
@@ -103,11 +103,12 @@ def test_on_unstage(mock_nidaq):
|
||||
dev.state._read_pv.mock_data = 0 # Set state to 0, 1 is Standby
|
||||
dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing
|
||||
dev.enable_compression._read_pv.mock_data = 0 # Compression enabled
|
||||
with pytest.raises(NidaqError):
|
||||
with pytest.raises(WaitTimeoutError):
|
||||
dev.on_unstage()
|
||||
dev.state._read_pv.mock_data = 1
|
||||
dev.on_unstage()
|
||||
assert dev.enable_compression.get() == 1
|
||||
# FIXME #22 add callback mechanism to MockPV to test the rest of the logic
|
||||
# dev.on_unstage()
|
||||
# assert dev.enable_compression.get() == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -128,7 +129,7 @@ def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state):
|
||||
if not raise_error:
|
||||
dev.pre_scan()
|
||||
else:
|
||||
with pytest.raises(NidaqError):
|
||||
with pytest.raises(WaitTimeoutError):
|
||||
dev.pre_scan()
|
||||
|
||||
|
||||
@@ -160,8 +161,6 @@ def test_on_complete(mock_nidaq):
|
||||
|
||||
# Test that it resolves if device is stopped
|
||||
dev.state.put(0) # Set state to DISABLED
|
||||
dev.stopped = True # Reset stopped state
|
||||
status = dev.on_complete()
|
||||
with pytest.raises(NidaqError):
|
||||
status.wait(timeout=5)
|
||||
dev.stop()
|
||||
status.wait(timeout=5)
|
||||
assert status.done is True
|
||||
|
||||
Reference in New Issue
Block a user