diff --git a/debye_bec/scans/__init__.py b/debye_bec/scans/__init__.py index bdd703c..d59f41e 100644 --- a/debye_bec/scans/__init__.py +++ b/debye_bec/scans/__init__.py @@ -1,4 +1,3 @@ -# from .nidaq_cont_scan import NIDAQContinuousScan from .nidaq_continuous_scan import NidaqContinuousScan from .xas_simple_scan import ( XasAdvancedScan, diff --git a/debye_bec/scans/nidaq_cont_scan.py b/debye_bec/scans/nidaq_cont_scan.py deleted file mode 100644 index e36e4b3..0000000 --- a/debye_bec/scans/nidaq_cont_scan.py +++ /dev/null @@ -1,84 +0,0 @@ -"""This module contains the scan class for the nidaq of the Debye beamline for use in continuous mode.""" - -import time -from typing import Literal - -import numpy as np -from bec_lib.device import DeviceBase -from bec_lib.logger import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase - -logger = bec_logger.logger - - -class NIDAQContinuousScan(AsyncFlyScanBase): - """Class for the nidaq continuous scan (without mono)""" - - scan_name = "nidaq_continuous_scan" - scan_type = "fly" - scan_report_hint = "device_progress" - required_kwargs = [] - use_scan_progress_report = False - pre_move = False - gui_config = {"Scan Parameters": ["scan_duration"], "Data Compression": ["compression"]} - - def __init__( - self, scan_duration: float, daq: DeviceBase = "nidaq", compression: bool = False, **kwargs - ): - """The NIDAQ continuous scan is used to measure with the NIDAQ without moving the - monochromator or any other motor. The NIDAQ thus runs in continuous mode, with a - set scan_duration. - - Args: - scan_duration (float): Duration of the scan. - daq (DeviceBase, optional): DAQ device to be used for the scan. - Defaults to "nidaq". - Examples: - >>> scans.nidaq_continuous_scan(scan_duration=10) - """ - super().__init__(**kwargs) - self.scan_duration = scan_duration - self.daq = daq - self.start_time = 0 - self.monitored_readout_cycle = 1 - self.scan_parameters["scan_duration"] = scan_duration - self.scan_parameters["compression"] = compression - - def update_readout_priority(self): - """Ensure that NIDAQ is not monitored for any quick EXAFS.""" - super().update_readout_priority() - self.readout_priority["async"].append("nidaq") - - def prepare_positions(self): - """Prepare the positions for the scan.""" - yield None - - def pre_scan(self): - """Pre Scan action.""" - - self.start_time = time.time() - # Ensure parent class pre_scan actions to be called. - yield from super().pre_scan() - - def scan_report_instructions(self): - """ - Return the instructions for the scan report. - """ - yield from self.stubs.scan_report_instruction({"device_progress": [self.daq]}) - - def scan_core(self): - """Run the scan core. - Kickoff the acquisition of the NIDAQ wait for the completion of the scan. - """ - kickoff_status = yield from self.stubs.kickoff(device=self.daq) - kickoff_status.wait(timeout=5) # wait for proper kickoff of device - - complete_status = yield from self.stubs.complete(device=self.daq, wait=False) - - while not complete_status.done: - # Readout monitored devices - yield from self.stubs.read(group="monitored", point_id=self.point_id) - time.sleep(self.monitored_readout_cycle) - self.point_id += 1 - - self.num_pos = self.point_id