feat(cont_grid): add continuous grid scan #222

Merged
appel_c merged 13 commits from scans_v4_fly_grid_scan into main 2026-06-16 16:25:31 +02:00
19 changed files with 801 additions and 271 deletions
+86 -59
View File
@@ -203,46 +203,52 @@ ccm_energy:
######################## SMARACT STAGES ##################################
##########################################################################
xbpm2x:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: A
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
init_position: 22.5
bl_smar_stage: 0
# xbpm2x:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: A
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: retry
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# init_position: 22.5
# in_position: -1.5
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 0
# xbpm2y:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: B
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: retry
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# in_position: -1.0
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 1
xbpm2y:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: B
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
scinx:
description: scintillator in OPbox
@@ -265,26 +271,47 @@ scinx:
init_position: -23
bl_smar_stage: 2
# poly:
# description: polarizer holder in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: D
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: retry
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# init_position: -23
# bl_smar_stage: 3
poly:
description: polarizer holder in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: B
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
init_position: -23
bl_smar_stage: 1
polrot:
description: rotation of crytal of the polarizer
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: A
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in_position: -1.0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 0
# dmm1_trx_readback_example: # This is the same template as for i.e. bpm4i
# description: 'This is an example of a read-only Epics signal'
+3 -132
View File
@@ -25,136 +25,7 @@ detectors:
#lamni:
# - !include ./ptycho_lamni.yaml
#user setup:
# - !include ./user_setup.yaml
user setup:
- !include ./user_setup.yaml
eyex:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES01
motor_resolution: 0.00125
base_velocity: 0.0625
velocity: 10
backlash_distance: 0.125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
eyey:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES02
motor_resolution: 0.00125
base_velocity: 0.0625
velocity: 10
backlash_distance: 0.125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
samx:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES18
motor_resolution: 0.000125
base_velocity: 0.00625
velocity: 1
backlash_distance: 0.0125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
samy:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES19
motor_resolution: 0.000125
base_velocity: 0.00625
velocity: 1
backlash_distance: 0.0125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
# eye_cam:
# description: Camera Microscope
# deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
# deviceConfig:
# camera_id: 1
# bits_per_pixel: 8
# num_rotation_90: 1
# transpose: false
# force_monochrome: false
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: async
smarx:
description: sample position x with smaract
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: G
host: x12sa-eb-smaract-mcs-05.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
#init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 6
smary:
description: sample position y with smaract
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: H
host: x12sa-eb-smaract-mcs-05.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
#init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 7
+128 -9
View File
@@ -2,14 +2,133 @@
############################################################
##################### EPS ##################################
############################################################
x12saEPS:
description: X12SA EPS info and control
deviceClass: csaxs_bec.devices.epics.eps.EPS
deviceConfig: {}
enabled: true
eyex:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES01
motor_resolution: 0.00125
base_velocity: 0.0625
velocity: 10
backlash_distance: 0.125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
eyey:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES02
motor_resolution: 0.00125
base_velocity: 0.0625
velocity: 10
backlash_distance: 0.125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
samx:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES18
motor_resolution: 0.000125
base_velocity: 0.00625
velocity: 1
backlash_distance: 0.0125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
samy:
description: Owis motor stage samx
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES2-ES19
motor_resolution: 0.000125
base_velocity: 0.00625
velocity: 1
backlash_distance: 0.0125
acceleration: 0.2
user_offset_dir: 0
deviceTags:
- cSAXS
- owis_samx
onFailure: buffer
enabled: true
readoutPriority: baseline
softwareTrigger: false
# eye_cam:
# description: Camera Microscope
# deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
# deviceConfig:
# camera_id: 1
# bits_per_pixel: 8
# num_rotation_90: 1
# transpose: false
# force_monochrome: false
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: async
smarx:
description: sample position x with smaract
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: G
host: x12sa-eb-smaract-mcs-05.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
readoutPriority: baseline
connectionTimeout: 20
userParameter:
#init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 6
smary:
description: sample position y with smaract
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: H
host: x12sa-eb-smaract-mcs-05.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
#init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 7
@@ -37,6 +37,7 @@ import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import CompareStatus, DeviceStatus, StatusBase, TransitionStatus
@@ -55,6 +56,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING
from csaxs_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase, ScanInfo
@@ -133,7 +135,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
device_manager (DeviceManagerBase | None, optional): Device manager. Defaults to None.
"""
USER_ACCESS = ["keep_shutter_open_during_scan", "set_trigger"]
USER_ACCESS = ["keep_shutter_open_during_scan", "set_trigger", "get_shutter_to_open_delay"]
# TODO Consider using the 'fsh' device instead.
fast_shutter_readback = Cpt(
@@ -172,6 +174,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self._poll_thread_poll_loop_done = threading.Event()
self._poll_thread_kill_event = threading.Event()
self._poll_thread.start()
self.scan_parameters: ScanServerScanInfo | None = None
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
@@ -231,6 +234,10 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self.burst_mode.put(0)
time.sleep(0.02)
def get_shutter_to_open_delay(self) -> float:
"""Get the current delay that is set to open the shutter before the exposure time."""
return self._shutter_to_open_delay
def keep_shutter_open_during_scan(self, open: True) -> None:
"""
Method to configure the delay generator for keeping the shutter open during a scans.
@@ -266,6 +273,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
- We set the delay pairs ef to be triggered after the shutter closes with a width of 1us to trigger the MCS card.
- Finally, we add a short sleep to ensure that the IOC and DDG HW process the values properly.
"""
self.scan_parameters = fetch_scan_info(self.scan_info)
start_time = time.time()
########################################
@@ -300,8 +308,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
### Setup timing for burst and delays ###
#########################################
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_parameters.frames_per_trigger
exp_time = self.scan_parameters.exp_time
# Burst Period DDG1
# Set burst_period to shutter width
@@ -340,7 +348,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Trigger extra pulse for MCS OR gate
# f = e + 1us
# e has refernce to d, f has reference to e
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self.set_delay_pairs(channel="ef", delay=0, width=0)
else:
self.set_delay_pairs(channel="ef", delay=1e-6, width=1e-6)
@@ -371,7 +379,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# counters are forwarded to BEC. Once the flag is set, we create a TransitionStatus DONE->ACQUIRING
# and start the acquisition through erase_start.put(1). Finally, we wait for the card to go to ACQUIRING state.
mcs._omit_mca_callbacks.clear() # pylint: disable=protected-access
status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING])
status_acquiring = CompareStatus(mcs.acquiring, ACQUIRING.ACQUIRING)
self.cancel_on_stop(status_acquiring)
mcs.erase_start.put(1)
@@ -494,9 +502,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
"""
if timeout is None:
# Default timeout of 5 seconds + exposure time * frames_per_trigger
timeout = 5 + self.scan_info.msg.scan_parameters.get(
"exp_time", 0.1
) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
timeout = 5 + self.scan_parameters.exp_time * self.scan_parameters.frames_per_trigger
# Callback to cancel the status if the device is stopped
def cancel_cb(status: CompareStatus) -> None:
@@ -562,7 +568,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# to indicate when the burst cycle is done. If no mcs card is available
# the fallback is to use the polling of the DDG
mcs = self.device_manager.devices.get("mcs", None)
if mcs is None or mcs.enabled is False or self.scan_info.msg.scan_type == "fly":
if (
mcs is None
or mcs.enabled is False
or self.scan_parameters.scan_type == "hardware_triggered"
):
self._poll_thread_poll_loop_done.wait(timeout=1)
logger.warning("Did not find mcs card with name 'mcs' in current session")
time.sleep(0.02)
@@ -577,7 +587,12 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash
# an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should
# be investigated why the EPICS interface is slow to respond.
status_mcs.wait(timeout=3)
try:
status_mcs.wait(timeout=3)
except Exception as exc:
logger.warning(f"MCS did not go to Acquiring within 3s. Retrying erase_start {exc}")
mcs.erase_start.put(1)
status_mcs.wait(timeout=3)
status = TransitionStatus(mcs.acquiring, [ACQUIRING.ACQUIRING, ACQUIRING.DONE])
logger.debug(f"Finished preparing mcs card {time.time()-start_time}")
@@ -25,6 +25,7 @@ Burst mode is enabled:
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import DeviceStatus, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -39,6 +40,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
DelayGeneratorCSAXS,
LiteralChannels,
)
from csaxs_bec.devices.utils.utils import fetch_scan_info
logger = bec_logger.logger
@@ -111,6 +113,10 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
distributes the trigger to the detectors. The DDG2 is triggered by the DDG1 through the EXT/EN channel.
"""
def on_init(self) -> None:
"""Initialize the device"""
self.scan_parameters: ScanServerScanInfo | None = None
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
"""
@@ -168,6 +174,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
through the EXT/EN channel.
"""
start_time = time.time()
self.scan_parameters = fetch_scan_info(self.scan_info)
########################################
### Burst mode settings ################
########################################
@@ -180,8 +187,8 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
if self.burst_delay.get() != 0:
self.burst_delay.put(0)
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
exp_time = self.scan_parameters.exp_time
frames_per_trigger = self.scan_parameters.frames_per_trigger
# NOTE Check if the exposure time is longer than all readout times.
# Raise a ValueError if requested exposure time is too short.
@@ -20,9 +20,10 @@ from typing import TYPE_CHECKING, Callable, Literal
import numpy as np
from bec_lib.logger import bec_logger
from ophyd.utils.errors import WaitTimeoutError
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd.utils.errors import WaitTimeoutError
from ophyd_devices import (
AsyncMultiSignal,
CompareStatus,
@@ -43,6 +44,7 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
READMODE,
MCSCard,
)
from csaxs_bec.devices.utils.utils import fetch_scan_info
@contextmanager
@@ -168,6 +170,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._mca_counter_index: int = 0
self._current_data: dict[str, dict[Literal["value", "timestamp"], list[int] | float]] = {}
self._omit_mca_callbacks: threading.Event = threading.Event()
self.scan_parameters: ScanServerScanInfo | None = None
def on_connected(self):
"""
@@ -315,6 +318,11 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
value: New value of the signal.
"""
try:
if (
self._num_lines > 1 and old_value > value
): # This indicates that we have moved to the next line in a cont scan
self._current_line += 1
value = value + (self._current_line - 1) * self.scan_parameters.frames_per_trigger
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
except Exception:
@@ -335,6 +343,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
'_start_monitor_async_data_emission', '_scan_done_callbacks', and '_current_data'.
"""
start_time = time.time()
self.scan_parameters = fetch_scan_info(self.scan_info)
# NOTE: If for some reason, the card is still acquiring, we need to stop it first
# This should never happen as the card is properly stopped during unstage
@@ -364,14 +373,16 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
#####################################
### Setup Acquisition Parameters ###
#####################################
triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
num_points = self.scan_info.msg.num_points
triggers = self.scan_parameters.frames_per_trigger
num_points = self.scan_parameters.num_points
self._num_total_triggers = triggers * num_points
self._num_lines = self.scan_parameters.additional_scan_parameters.get("num_lines", 1)
self._current_line = 1
self._acquisition_group = "monitored" if triggers == 1 else "burst_group"
self.preset_real.set(0).wait(timeout=self._pv_timeout)
if self.scan_info.msg.scan_type == "step":
if self.scan_parameters.scan_type == "software_triggered":
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
elif self.scan_info.msg.scan_type == "fly":
elif self.scan_parameters.scan_type == "hardware_triggered":
self.num_use_all.set(self._num_total_triggers).wait(timeout=self._pv_timeout)
# Clear any previous data, just to be sure
@@ -395,7 +406,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._omit_mca_callbacks.clear()
# For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self.erase_start.put(1)
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
@@ -406,7 +417,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
that the card is properly started for fly scans. For step scans, this will be handled by the DDG,
so no action is required here.
"""
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
status_acquiring = CompareStatus(self.acquiring, ACQUIRING.ACQUIRING)
self.cancel_on_stop(status_acquiring)
return status_acquiring
@@ -444,11 +455,11 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
while self._start_monitor_async_data_emission.wait():
try:
if (
hasattr(self.scan_info.msg, "num_points")
and self.scan_info.msg.num_points is not None
hasattr(self.scan_parameters, "num_points")
and self.scan_parameters.num_points is not None
):
if self.scan_info.msg.scan_type == "step":
if self._current_data_index == self.scan_info.msg.num_points:
if self.scan_parameters.scan_type == "software_triggered":
if self._current_data_index == self.scan_parameters.num_points:
for callback in self._scan_done_callbacks:
callback(exception=None)
else:
@@ -513,13 +524,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# that the acquisition finishes on the card and that data is emitted to BEC. If the acquisition
# was already finished (i.e. normal step scan sends 1 extra pulse per burst cycle), this will
# not have any effect as the card will already be in DONE state and signal.
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
status = CompareStatus(self.current_channel, expected_points-1, operation_success=">=")
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
status = CompareStatus(
self.current_channel, expected_points - 1, operation_success=">="
)
try:
status.wait(timeout=5)
except WaitTimeoutError:
+7 -5
View File
@@ -39,6 +39,7 @@ from typing import TYPE_CHECKING, Literal
import yaml
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_state import DetectorState
@@ -52,6 +53,7 @@ from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from csaxs_bec.devices.jungfraujoch.jungfraujoch_preview import JungfrauJochPreview
from csaxs_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
@@ -117,6 +119,7 @@ class Eiger(PSIDeviceBase):
self._wait_for_on_complete = 20 # seconds
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
self.scan_parameters: ScanServerScanInfo | None = None
def _preview_callback(self, message: dict) -> None:
"""
@@ -263,26 +266,25 @@ class Eiger(PSIDeviceBase):
def on_stage(self) -> DeviceStatus | None:
"""
Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object.
scan_msg = self.scan_info.msg
"""
start_time = time.time()
scan_msg = self.scan_info.msg
self.scan_parameters = fetch_scan_info(self.scan_info)
# TODO: Check mono energy from device in BEC
# Setting incident energy in keV
incident_energy = 12.0
# Setting up exp_time and num_triggers acquisition parameter
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
exp_time = self.scan_parameters.exp_time
if exp_time <= self._readout_time: # Exp_time must be at least the readout time
raise ValueError(
f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s."
)
self._num_triggers = int(
scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
# Setting up the full path for file writing
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
self._full_path = get_full_path(self.scan_info.msg, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
+1 -1
View File
@@ -21,7 +21,7 @@ if TYPE_CHECKING: # pragma no cover
from bec_server.device_server.device_server import DeviceManagerDS
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 9M" # "EIGER 9M""
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M" # "EIGER 9M""
# pylint:disable=invalid-name
+10 -5
View File
@@ -3,9 +3,12 @@
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
from csaxs_bec.devices.utils.utils import fetch_scan_info
logger = bec_logger.logger
@@ -20,16 +23,19 @@ class PandaBoxCSAXS(PandaBox):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
self.scan_parameters: ScanServerScanInfo | None = None
def on_stage(self):
self.scan_parameters = fetch_scan_info(self.scan_info)
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
elif self.scan_parameters.scan_type == "software_triggered":
if self.scan_parameters.frames_per_trigger == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
@@ -44,8 +50,7 @@ class PandaBoxCSAXS(PandaBox):
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
@@ -3,9 +3,12 @@
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
from csaxs_bec.devices.utils.utils import fetch_scan_info
logger = bec_logger.logger
@@ -16,16 +19,18 @@ class PandaBoxOMNY(PandaBox):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
self.scan_parameters: ScanServerScanInfo | None = None
def on_stage(self):
start_time = time.time()
super().on_stage()
self.scan_parameters = fetch_scan_info(self.scan_info)
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
elif self.scan_parameters.scan_type == "software_triggered":
if self.scan_parameters.frames_per_trigger == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
@@ -40,8 +45,7 @@ class PandaBoxOMNY(PandaBox):
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
View File
+27
View File
@@ -0,0 +1,27 @@
"""Utility functions for the devices."""
from copy import deepcopy
import numpy as np
from bec_lib.devicemanager import ScanInfo
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from pydantic import ValidationError
def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
"""Fetch the scan parameters from the scan_info object and return them as a ScanServerScanInfo object."""
info = scan_info.msg.info
if isinstance(info["positions"], list):
info["positions"] = np.array(info["positions"])
info["num_monitored_readouts"] = scan_info.msg.num_monitored_readouts
try:
msg = ScanServerScanInfo.model_validate(info)
except ValidationError: # This means we have an old scan_info object.
info = deepcopy(info)
# We need to convert a few parameters manually.
info["scan_type"] = (
"hardware_triggered" if info["scan_type"] == "fly" else "software_triggered"
)
msg = ScanServerScanInfo.model_validate(info)
return msg
+1
View File
@@ -3,4 +3,5 @@ from .jungfrau_joch_scan import JungfrauJochTestScan
from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter
from .omny_fermat_scan import OMNYFermatScan
from .owis_grid import OwisGrid
from .scans_v4.cont_grid import ContGrid
from .sgalil_grid import SgalilGrid
+415
View File
@@ -0,0 +1,415 @@
"""
Continuous grid scan with 2-axis. The scan requires the fast axis to properly implement base velocity as well as high velocity and high acceleration time
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated, TypedDict
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_lib.scan_args import DefaultArgType, ScanArgument
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import position_generators
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
from csaxs_bec.scans.scan_customization.scan_components import CsaxsBecScanComponents
logger = bec_logger.logger
class ContinuousMotorParameter(TypedDict):
"""TypedDict for the parameters related to the continuous motor, which are needed to properly restore the motor state after the scan."""
original_velocity: float | None
original_acceleration: float | None
target_velocity: float | None
base_velocity: float | None
acc_time: float | None
premove_distance: float | None
shutter_open_delay: float | None
num_lines: int | None
class ContGrid(ScanBase):
# Scan Type: Hardware triggered or software triggered?
# If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED.
# If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED.
# This primarily serves as information for devices: The device may need to react differently if a software trigger is expected
# for every point.
scan_type = ScanType.SOFTWARE_TRIGGERED
# Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces.
# Choose a descriptive name that does not conflict with existing scan names.
# It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number.
scan_name = "cont_grid"
gui_config = {
"Scan Parameters": [
"fast_axis",
"fast_start",
"fast_end",
"fast_step_size",
"stepper_axis",
"stepper_start",
"stepper_stop",
"stepper_step_size",
],
"Acquisition Parameters": ["exp_time", "relative", "fast_axis_always_in_pos_dir"],
}
def __init__(
self,
fast_axis: Annotated[
DeviceBase,
ScanArgument(display_name="Fast Axis", description="Axis with continuous motion."),
],
fast_start: Annotated[
float,
ScanArgument(
display_name="Fast Start",
description="Start position for measurement points of the fast axis.",
),
],
fast_end: Annotated[
float,
ScanArgument(
display_name="Fast End",
description="End position for measurement points of the fast axis.",
),
],
fast_step_size: Annotated[
float,
ScanArgument(
display_name="Fast Step Size",
description="Step size for points of the continuous motion axis.",
),
],
stepper_axis: Annotated[
DeviceBase,
ScanArgument(
display_name="Step Axis",
description="Step axis of the grid scan, stepping through the lines.",
),
],
stepper_start: Annotated[
float,
ScanArgument(display_name="Step Start", description="Start position of the step axis."),
],
stepper_stop: Annotated[
float,
ScanArgument(display_name="Step Stop", description="End position of the step axis."),
],
stepper_step_size: Annotated[
float,
ScanArgument(
display_name="Step Step Size",
description="Step size of the step axis in units of the motor.",
),
],
exp_time: DefaultArgType.ExposureTime,
relative: DefaultArgType.Relative = False,
fast_axis_always_in_pos_dir: bool = True,
**kwargs,
):
"""
Continuous grid scan with 2-axis. The scan requires the fast axis to properly implement base velocity as well as high velocity and high acceleration time
Args:
fast_axis (DeviceBase): Axis with continuous motion.
fast_start (float): Start position for measurement points of the fast axis.
fast_end (float): End position for measurement points of the fast axis.
fast_step_size (float): Step size for points of the continuous motion axis.
stepper_axis (DeviceBase): Step axis of the grid scan, stepping through the lines.
stepper_start (float): Start position of the step axis.
stepper_stop (float): End position of the step axis.
stepper_step_size (float): Step size of the step axis in units of the motor.
relative (bool): Whether the positions are relative to the current position. Default is False, i.e. absolute positions.
exp_time (float): Exposure time in seconds
fast_axis_always_in_pos_dir (bool): Whether to always scan in the positive direction, default is True.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self.components = CsaxsBecScanComponents(self)
self._baseline_readout_status = None
self.fast_axis = fast_axis
self.fast_axis_always_in_pos_dir = fast_axis_always_in_pos_dir
if self.fast_axis_always_in_pos_dir and (fast_end < fast_start): # Switch if needed
fast_start, fast_end = fast_end, fast_start
self.fast_start = fast_start
self.fast_end = fast_end
self.fast_step_size = fast_step_size
self.stepper_axis = stepper_axis
self.stepper_start = stepper_start
self.stepper_stop = stepper_stop
self.stepper_step_size = stepper_step_size
self.relative = relative
self.exp_time = exp_time
self.ddg1 = self.device_manager.devices["ddg1"]
self.mcs = self.device_manager.devices["mcs"]
self.motors = [fast_axis, stepper_axis]
self._cont_motor_params: ContinuousMotorParameter = ContinuousMotorParameter()
self.update_scan_info(relative=relative, exp_time=exp_time)
@scan_hook
def prepare_scan(self):
"""
Prepare the scan. This can include any steps that need to be executed
before the scan is opened, such as preparing the positions (if not done already)
or setting up the devices.
"""
self._check_motor_inputs()
frames_per_trigger = int(
np.ceil(np.abs(self.fast_end - self.fast_start) / self.fast_step_size)
)
self._cont_motor_params["num_lines"] = int(
np.ceil(np.abs(self.stepper_stop - self.stepper_start) / self.stepper_step_size)
)
positions = position_generators.nd_grid_positions(
[
(self.fast_start, self.fast_end, frames_per_trigger),
(self.stepper_start, self.stepper_stop, self._cont_motor_params["num_lines"]),
],
snaked=False,
)
# Count only the end point of each line as a valid position, as the fast axis is continuously moving and only triggered at
# the beginning of the line moving to the end point.
self.positions = positions[(frames_per_trigger - 1) :: frames_per_trigger, :]
# Get device specific parameters
self._fetch_device_params()
# Adjust relative positions if needed
if self.relative:
self.start_positions = self.components.get_start_positions(self.motors)
self.positions += self.start_positions
self.fast_start += self.start_positions[0]
self.fast_end += self.start_positions[0]
self.stepper_start += self.start_positions[1]
self.stepper_stop += self.start_positions[1]
# Adjust premove
self.fast_start -= self._cont_motor_params["premove_distance"]
self.fast_end += self._cont_motor_params["premove_distance"]
self.actions.set_device_readout_priority(self.motors, priority="monitored")
self.update_scan_info(
positions=self.positions,
num_points=len(self.positions),
frames_per_trigger=frames_per_trigger,
computed_positions=positions,
num_lines=self._cont_motor_params["num_lines"],
)
self.actions.add_scan_report_instruction_device_progress(self.mcs)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
self._premove_motor_status = self.actions.set(
self.motors, [self.fast_start, self.stepper_start], wait=False
)
@scan_hook
def open_scan(self):
"""
Open the scan.
This step must call self.actions.open_scan() to ensure that a new scan is
opened. Make sure to prepare the scan metadata before, either in
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
to update the scan metadata if needed.
"""
self.actions.open_scan()
@scan_hook
def stage(self):
"""
Stage the devices for the upcoming scan. The stage logic is typically
implemented on the device itself (i.e. by the device's stage method).
However, if there are any additional steps that need to be executed before
staging the devices, they can be implemented here.
"""
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""
Pre-scan steps to be executed before the main scan logic.
This is typically the last chance to prepare the devices before the core scan
logic is executed. For example, this is a good place to initialize time-criticial
devices, e.g. devices that have a short timeout.
The pre-scan logic is typically implemented on the device itself.
"""
self._premove_motor_status.wait()
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""
Core scan logic to be executed during the scan.
This is where the main scan logic should be implemented.
"""
# Only use every second position, at each point will use
for line_index in range(self._cont_motor_params["num_lines"]):
self.actions.set(
self.motors, [self.fast_start, self.positions[line_index][1]], wait=True
)
self.at_each_point(motors=[self.fast_axis], positions=np.array([self.fast_end]))
self._restore_motor_properties()
@scan_hook
def at_each_point(
self,
motors: list[str | DeviceBase],
positions: np.ndarray,
last_positions: np.ndarray | None = None,
):
"""
Logic to be executed at each point of the scan.
This is where the main scan logic should be implemented, e.g. triggering the readout devices.
The at_each_point logic is typically implemented on the device itself.
"""
self.fast_axis.velocity.set(self._cont_motor_params["target_velocity"]).wait(timeout=5)
self.fast_axis.acceleration.set(self._cont_motor_params["acc_time"]).wait(timeout=5)
move_status = self.actions.set(motors, positions, wait=False)
time.sleep(self._cont_motor_params["acc_time"])
trigger_status = self.ddg1.trigger()
while not move_status.done:
self.actions.read_monitored_devices(wait=True)
try:
move_status.wait(timeout=0.5)
except TimeoutError:
continue
try:
trigger_status.wait(timeout=2)
except TimeoutError as exc:
raise ScanAbortion(
f"Status for delay generator trigger {self.ddg1.name} did not resolve after 2 seconds. "
) from exc
@scan_hook
def post_scan(self):
"""
Post-scan steps to be executed after the main scan logic.
"""
self._restore_motor_properties()
status = self.actions.complete_all_devices(wait=False)
if self.relative:
self.components.move_and_wait(self.motors, self.start_positions)
status.wait()
@scan_hook
def unstage(self):
"""Unstage the scan by executing post-scan steps."""
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
"""Close the scan."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
"""
Handle exceptions that occur during the scan.
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
such as returning the devices to a safe state or moving the motors back to their starting position.
"""
self._restore_motor_properties()
if self.relative:
self.components.move_and_wait(self.motors, self.start_positions)
#######################################################
######### Helper methods for the scan logic ###########
#######################################################
# Implement scan-specific helper methods below.
def _check_motor_inputs(self):
"""
Check the motor inputs for validity, e.g. whether the start and stop positions are not too close for the given step size.
"""
if np.isclose(self.fast_start, self.fast_end, atol=self.fast_step_size):
raise ScanAbortion(
f"Fast stop {self.fast_end} and fast start {self.fast_start} positions are too close for the given step size {self.fast_step_size}."
)
if np.isclose(self.stepper_start, self.stepper_stop, atol=self.stepper_step_size):
raise ScanAbortion(
f"Stepper stop {self.stepper_stop} and stepper start {self.stepper_start} positions are too close for the given step size {self.stepper_step_size}."
)
def _restore_motor_properties(self):
vel = self._cont_motor_params["original_velocity"]
acc = self._cont_motor_params["original_acceleration"]
if vel is not None:
self.fast_axis.velocity.put(vel)
if acc is not None:
self.fast_axis.acceleration.put(acc)
def _fetch_device_params(self):
self._cont_motor_params["shutter_open_delay"] = self.ddg1.get_shutter_to_open_delay()
self._cont_motor_params["original_acceleration"] = self.fast_axis.acceleration.get()
self._cont_motor_params["original_velocity"] = self.fast_axis.velocity.get()
self._cont_motor_params["base_velocity"] = self.fast_axis.base_velocity.get()
target_vel = self.fast_step_size / self.exp_time
if target_vel > self._cont_motor_params["original_velocity"]:
raise ScanAbortion(
f"Requested velocity of {target_vel} exceeds maximum velocity {self._cont_motor_params['original_velocity']} of motor {self.fast_axis.name}."
)
if target_vel < self._cont_motor_params["base_velocity"]:
raise ScanAbortion(
f"Requested velocity of {target_vel} is below base velocity {self._cont_motor_params['base_velocity']}."
)
accceleration_time = (
(target_vel - self._cont_motor_params["base_velocity"])
/ (
self._cont_motor_params["original_velocity"]
- self._cont_motor_params["base_velocity"]
)
* self._cont_motor_params["original_acceleration"]
)
self._cont_motor_params["target_velocity"] = target_vel
self._cont_motor_params["acc_time"] = accceleration_time
if self._cont_motor_params["acc_time"] < self._cont_motor_params["shutter_open_delay"]:
extra_distance = (
self._cont_motor_params["target_velocity"]
* self._cont_motor_params["shutter_open_delay"]
)
# Adjust acc time to account for shutter delay
self._cont_motor_params["acc_time"] += self._cont_motor_params["shutter_open_delay"]
else:
extra_distance = 0
self._cont_motor_params["premove_distance"] = self._compute_premove_distance(
additional_distance=extra_distance
)
def _compute_premove_distance(self, additional_distance: float) -> float:
return (
0.5
* (
self._cont_motor_params["target_velocity"]
+ self._cont_motor_params["base_velocity"]
)
* self._cont_motor_params["acc_time"]
+ additional_distance * self._cont_motor_params["target_velocity"]
)
@@ -43,6 +43,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
DelayGeneratorCSAXS,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
from csaxs_bec.devices.utils.utils import fetch_scan_info
############################
### Test Delay Generator ###
@@ -280,8 +281,8 @@ def test_ddg1_stage(mock_ddg1: DDG1):
mock_ddg1.burst_delay.put(5) # Non-default, should be reset on stage
mock_ddg1.burst_count.put(10) # Non-default, should be reset on stage
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
mock_ddg1.scan_info.msg.info["exp_time"] = exp_time
mock_ddg1.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
mock_ddg1.fast_shutter_control._read_pv.mock_data = 0 # Simulate shutter control
mock_ddg1.stage()
@@ -315,7 +316,7 @@ def test_ddg1_stage(mock_ddg1: DDG1):
shutter_width, exp_time * frames_per_trigger
) # Shutter to open delay is not added as shutter is kept open
# Simulate fly scan, so no extra trigger for MCS card.
mock_ddg1.scan_info.msg.scan_type = "fly"
mock_ddg1.scan_info.msg.info["scan_type"] = "hardware_triggered"
mock_ddg1.stage()
# Shutter channel cd
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
@@ -339,6 +340,11 @@ def test_ddg1_on_trigger(mock_ddg1: DDG1):
# Make sure DDG is setup in default state through on_connected
ddg.on_connected()
ddg.scan_info.msg.info["scan_type"] = (
"software_triggered" # Simulate fly scan, so no extra trigger for MCS card.
)
ddg.scan_parameters = fetch_scan_info(ddg.scan_info)
# Check that poll thread is running and run event is not set
assert ddg._poll_thread.is_alive()
assert not ddg._poll_thread_run_event.is_set()
@@ -514,8 +520,8 @@ def test_ddg2_on_stage(mock_ddg2: DDG2):
exp_time = 0.1
frames_per_trigger = 10
ddg.on_connected()
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time
ddg.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
ddg.scan_info.msg.info["exp_time"] = exp_time
ddg.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
# Set non-default burst mode settings
ddg.burst_mode.put(0)
@@ -538,7 +544,7 @@ def test_ddg2_on_stage(mock_ddg2: DDG2):
ddg.unstage() # Reset staged state for next test
exp_time_short = 2e-4 # too short exposure time
with pytest.raises(ValueError):
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time_short
ddg.scan_info.msg.info["exp_time"] = exp_time_short
ddg.stage()
+14 -1
View File
@@ -8,6 +8,7 @@ from unittest import mock
import numpy as np
import pytest
from bec_lib.messages import FileMessage, ScanStatusMessage
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_list import DetectorList
@@ -49,10 +50,19 @@ def mock_scan_info(request, tmpdir):
"frames_per_trigger": frames_per_trigger,
"system_config": {},
},
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
info=ScanServerScanInfo(
scan_name=scan_name,
scan_id="test_id",
positions=np.array([[0, 0], [1, 1], [2, 2]]),
exp_time=exp_time,
frames_per_trigger=frames_per_trigger,
system_config={},
num_points=num_points,
).model_dump(),
num_points=num_points,
scan_name=scan_name,
)
scan_info.info.update({"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")})
yield scan_info
@@ -171,6 +181,9 @@ def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
assert mock_jfj_preview_client.start.call_count == 1
@pytest.mark.skip(
reason="This test is currently disabled because the Eiger 9M config is replaced by the 8_5M config."
)
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
"""Test the on_connected logic of the Eiger detector."""
+6 -3
View File
@@ -25,6 +25,7 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
MCSCard,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
from csaxs_bec.devices.utils.utils import fetch_scan_info
@pytest.fixture(scope="function")
@@ -121,8 +122,8 @@ def test_mcs_card_csaxs_stage(mock_mcs_csaxs: MCSCardCSAXS):
mcs = mock_mcs_csaxs
triggers = 5
num_points = 10
mcs.scan_info.msg.scan_parameters["frames_per_trigger"] = triggers
mcs.scan_info.msg.num_points = num_points
mcs.scan_info.msg.info["frames_per_trigger"] = triggers
mcs.scan_info.msg.info["num_points"] = num_points
# Simulate that the MCS card is still acquiring, and that current channel is !=0
mcs.current_channel._read_pv.mock_data = 2 # Simulate that current channel is not zero
@@ -171,6 +172,7 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
II. Acquisition completes normally
"""
mcs = mock_mcs_csaxs
mcs.scan_parameters = fetch_scan_info(mcs.scan_info)
mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING
# Make sure that device on_connected has been called which starts the monitoring thread
mcs.on_connected()
@@ -197,7 +199,8 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
#######################
mcs._current_data_index = 0
mcs.scan_info.msg.num_points = 10
mcs.scan_info.msg.info["num_points"] = 10
mcs.scan_parameters = fetch_scan_info(mcs.scan_info)
mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING
st = mcs.complete()
+19 -16
View File
@@ -10,6 +10,7 @@ from ophyd import Staged
from csaxs_bec.devices.panda_box.panda_box import PandaBoxCSAXS
from csaxs_bec.devices.panda_box.panda_box_omny import PandaBoxOMNY
from csaxs_bec.devices.utils.utils import fetch_scan_info
@pytest.fixture
@@ -65,18 +66,18 @@ def test_panda_omny(panda_omny):
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
("hardware_triggered", 1, "fly"),
("hardware_triggered", 5, "fly"),
("software_triggered", 10, "burst"),
("software_triggered", 1, "monitored"), # Default case
],
)
def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_acquisition_group):
# Check that the stage signal is present and has the correct PV
assert len(panda_omny._status_callbacks) == 0
panda_omny.scan_info.msg.scan_type = scan_type
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_omny.scan_info.msg.info["scan_type"] = scan_type
panda_omny.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
panda_omny.stage()
assert panda_omny._acquisition_group == expected_acquisition_group
@@ -85,8 +86,9 @@ def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_ac
def test_panda_omny_complete(panda_omny):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_omny.scan_info.msg.num_points = 1
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_omny.scan_info.msg.info["num_points"] = 1
panda_omny.scan_info.msg.info["frames_per_trigger"] = 1
panda_omny.scan_parameters = fetch_scan_info(panda_omny.scan_info)
panda_omny._timeout_on_completed = 0.5 # Set a short timeout for testing
@@ -133,18 +135,18 @@ def test_panda_csaxs(panda_csaxs):
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
("hardware_triggered", 1, "fly"),
("hardware_triggered", 5, "fly"),
("software_triggered", 10, "burst"),
("software_triggered", 1, "monitored"), # Default case
],
)
def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_acquisition_group):
"""Test the on_stage method of the PandaBoxCSAXS device for different scan types and frames per trigger."""
assert len(panda_csaxs._status_callbacks) == 0
panda_csaxs.scan_info.msg.scan_type = scan_type
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_csaxs.scan_info.msg.info["scan_type"] = scan_type
panda_csaxs.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
panda_csaxs.stage()
assert panda_csaxs._acquisition_group == expected_acquisition_group
@@ -153,8 +155,9 @@ def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_
def test_panda_csaxs_complete(panda_csaxs):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_csaxs.scan_info.msg.num_points = 1
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_csaxs.scan_info.msg.info["num_points"] = 1
panda_csaxs.scan_info.msg.info["frames_per_trigger"] = 1
panda_csaxs.scan_parameters = fetch_scan_info(panda_csaxs.scan_info)
panda_csaxs._timeout_on_completed = 0.5 # Set a short timeout for testing