feat: add nidaq_continuous_scan scan

This commit is contained in:
gac-x01da
2025-05-27 10:50:35 +02:00
committed by appel_c
parent 718a001a8a
commit 89cc27a8da
3 changed files with 166 additions and 14 deletions

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger
@@ -325,6 +326,7 @@ class NidaqControl(Device):
stop_call = Cpt(EpicsSignal, suffix="NIDAQ-Stop", kind=Kind.config)
power = Cpt(EpicsSignal, suffix="NIDAQ-Power", kind=Kind.config)
heartbeat = Cpt(EpicsSignal, suffix="NIDAQ-Heartbeat", kind=Kind.config, auto_monitor=True)
time_left = Cpt(EpicsSignalRO, suffix="NIDAQ-TimeLeft", kind=Kind.config, auto_monitor=True)
ai_chans = Cpt(EpicsSignal, suffix="NIDAQ-AIChans", kind=Kind.config)
ci_chans = Cpt(EpicsSignal, suffix="NIDAQ-CIChans6614", kind=Kind.config)
@@ -344,6 +346,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.scan_info : ScanInfo
self.timeout_wait_for_signal = 5 # put 5s firsts
self._timeout_wait_for_pv = 3 # 3s timeout for pv calls
self.valid_scan_names = [
@@ -351,6 +354,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
########################################
@@ -503,6 +507,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.time_left.subscribe(self._progress_update, run=False)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
@@ -522,8 +527,16 @@ class Nidaq(PSIDeviceBase, NidaqControl):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
# If scan is not part of the valid_scan_names,
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.scan_type.set(ScanType.TRIGGERED).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(0).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(1).wait(timeout=self._timeout_wait_for_pv)
else:
self.scan_type.set(ScanType.CONTINUOUS).wait(timeout=self._timeout_wait_for_pv)
self.scan_duration.set(self.scan_info.msg.scan_parameters["scan_duration"]).wait(timeout=self._timeout_wait_for_pv)
self.enable_compression.set(self.scan_info.msg.scan_parameters["compression"]).wait(timeout=self._timeout_wait_for_pv)
self.stage_call.set(1).wait(timeout=self._timeout_wait_for_pv)
if not self.wait_for_condition(
@@ -534,21 +547,30 @@ class Nidaq(PSIDeviceBase, NidaqControl):
raise NidaqError(
f"Device {self.name} has not been reached in state STAGE, current state {NidaqState(self.state.get())}"
)
self.kickoff_call.set(1).wait(timeout=self._timeout_wait_for_pv)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
status = self.on_kickoff()
status.wait(timeout=self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was staged: {NidaqState(self.state.get())}")
def on_kickoff(self) -> DeviceStatus | StatusBase:
""" Kickoff the Nidaq"""
status = self.kickoff_call.set(1)
return status
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device. Check that the Nidaq goes into Standby"""
def _get_state():
return self.state.get() == NidaqState.STANDBY
# TODO We need to wait longer if rle is disabled
if not self.wait_for_condition(
condition=_get_state, timeout=self.timeout_wait_for_signal, check_stopped=False
):
raise NidaqError(
f"Device {self.name} has not been reached in state STANDBY, current state {NidaqState(self.state.get())}"
)
self.enable_compression.set(1).wait(self._timeout_wait_for_pv)
logger.info(f"Device {self.name} was unstaged: {NidaqState(self.state.get())}")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
@@ -561,6 +583,12 @@ class Nidaq(PSIDeviceBase, NidaqControl):
"""
if not self._check_if_scan_name_is_valid():
return None
if self.scan_info.msg.scan_name == "nidaq_continuous_scan":
logger.info(
f"Device {self.name} ready to be kicked off for nidaq_continuous_scan"
)
return None
def _wait_for_state():
return self.state.get() == NidaqState.KICKOFF
@@ -587,18 +615,47 @@ class Nidaq(PSIDeviceBase, NidaqControl):
"""
if not self._check_if_scan_name_is_valid():
return None
self.on_stop()
# TODO check if this wait can be removed. We are waiting in unstage anyways which will always be called afterwards
# Wait for device to be stopped
status = self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY,
check_stopped=True,
timeout=self.timeout_wait_for_signal,
)
return status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def _check_state(self) -> bool:
while True:
if self.stopped is True:
raise NidaqError(f"Device {self.name} was stopped")
if self.state.get() == NidaqState.STANDBY:
return
# if time.time() > timeout_time:
# raise TimeoutError(f"Device {self.name} ran into timeout")
time.sleep(0.1)
if self.scan_info.msg.scan_name != "nidaq_continuous_scan":
self.on_stop()
timeout = self.timeout_wait_for_signal
status = self.wait_for_condition(
condition=lambda: self.state.get() == NidaqState.STANDBY,
check_stopped=True,
timeout=timeout,
)
else:
status = self.task_handler.submit_task(task=_check_state, task_args=(self,))
return status
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
scan_duration = self.scan_info.msg.scan_parameters.get("scan_duration", None)
if not isinstance(scan_duration, (int, float)):
return
value = scan_duration - value
max_value = scan_duration
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
max_value=max_value,
done=bool(value == max_value),
)
def on_stop(self) -> None:
"""Called when the device is stopped."""

View File

@@ -4,3 +4,7 @@ from .mono_bragg_scans import (
XASSimpleScan,
XASSimpleScanWithXRD,
)
from .nidaq_cont_scan import (
NIDAQContinuousScan,
)

View File

@@ -0,0 +1,91 @@
"""This module contains the scan class for the nidaq of the Debye beamline for use in continuous mode."""
import time
from typing import Literal
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class NIDAQContinuousScan(AsyncFlyScanBase):
"""Class for the nidaq continuous scan (without mono)"""
scan_name = "nidaq_continuous_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Scan Parameters": ["scan_duration"],
"Data Compression" : ["compression"],
}
def __init__(
self,
scan_duration: float,
daq: DeviceBase = "nidaq",
compression: bool = False,
**kwargs,
):
""" The NIDAQ continuous scan is used to measure with the NIDAQ without moving the
monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a
set scan_duration.
Args:
scan_duration (float): Duration of the scan.
daq (DeviceBase, optional): DAQ device to be used for the scan.
Defaults to "nidaq".
Examples:
>>> scans.nidaq_continuous_scan(scan_duration=10)
"""
super().__init__(**kwargs)
self.scan_duration = scan_duration
self.daq = daq
self.start_time = 0
self.primary_readout_cycle = 1
self.scan_parameters["scan_duration"] = scan_duration
self.scan_parameters["compression"] = compression
def update_readout_priority(self):
"""Ensure that NIDAQ is not monitored for any quick EXAFS."""
super().update_readout_priority()
self.readout_priority["async"].append("nidaq")
def prepare_positions(self):
"""Prepare the positions for the scan."""
yield None
def pre_scan(self):
"""Pre Scan action."""
self.start_time = time.time()
# Ensure parent class pre_scan actions to be called.
yield from super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]})
def scan_core(self):
"""Run the scan core.
Kickoff the acquisition of the NIDAQ wait for the completion of the scan.
"""
kickoff_status = yield from self.stubs.kickoff(device=self.daq)
kickoff_status.wait(timeout=5) # wait for proper kickoff of device
complete_status = yield from self.stubs.complete(device=self.daq, wait=False)
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id