diff --git a/csaxs_bec/devices/epics/falcon_csaxs.py b/csaxs_bec/devices/epics/falcon_csaxs.py index 962eb9f..3c7bec0 100644 --- a/csaxs_bec/devices/epics/falcon_csaxs.py +++ b/csaxs_bec/devices/epics/falcon_csaxs.py @@ -1,15 +1,17 @@ +"""Falcon Sitoro detector class for cSAXS beamline.""" + import enum import os import threading +from typing import Literal +from bec_lib.file_utils import get_full_path from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV -from ophyd.mca import EpicsMCARecord -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin, - PSIDetectorBase, -) +from ophyd_devices import CompareStatus, FileEventSignal +from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35 as HDF5Plugin +from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase logger = bec_logger.logger @@ -18,15 +20,11 @@ class FalconError(Exception): """Base class for exceptions in this module.""" -class FalconTimeoutError(FalconError): - """Raised when the Falcon does not respond in time.""" - - -class DetectorState(enum.IntEnum): +class ACQUIRESTATUS(enum.IntEnum): """Detector states for Falcon detector""" DONE = 0 - ACQUIRING = 1 + ACQUIRING = 1 # or Capturing class TriggerSource(enum.IntEnum): @@ -44,238 +42,55 @@ class MappingSource(enum.IntEnum): MAPPING = 1 -class EpicsDXPFalcon(Device): - """ - DXP parameters for Falcon detector +class FalconControl(Falcon): + """Falcon Control class at cSAXS. prefix: 'X12SA-SITORO:'""" - Base class to map EPICS PVs from DXP parameters to ophyd signals. + dxp = Cpt(EpicsDXPFalcon, "dxp1:") + mca = Cpt(EpicsMCARecord, "mca1") + hdf5 = Cpt(HDF5Plugin, "HDF1:") + + +class FalconcSAXS(PSIDeviceBase, FalconControl): + """ + Falcon Sitoro detector for CSAXS + + + class attributes: + dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector + mca (EpicsMCARecord) : MCA parameters for Falcon detector + hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector + MIN_READOUT (float) : Minimum readout time for the detector """ - elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime") - elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime") - elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime") + # specify minimum readout time for detector + MIN_READOUT = 3e-3 + _pv_timeout = 3 # Timeout for PV operations in seconds - # Energy Filter PVs - energy_threshold = Cpt(EpicsSignalWithRBV, "DetectionThreshold") - min_pulse_separation = Cpt(EpicsSignalWithRBV, "MinPulsePairSeparation") - detection_filter = Cpt(EpicsSignalWithRBV, "DetectionFilter", string=True) - scale_factor = Cpt(EpicsSignalWithRBV, "ScaleFactor") - risetime_optimisation = Cpt(EpicsSignalWithRBV, "RisetimeOptimization") - - # Misc PVs - detector_polarity = Cpt(EpicsSignalWithRBV, "DetectorPolarity") - decay_time = Cpt(EpicsSignalWithRBV, "DecayTime") - - current_pixel = Cpt(EpicsSignalRO, "CurrentPixel") - - -class FalconHDF5Plugins(Device): - """ - HDF5 parameters for Falcon detector - - Base class to map EPICS PVs from HDF5 Plugin to ophyd signals. - """ - - capture = Cpt(EpicsSignalWithRBV, "Capture") - enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config") - xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config") - lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'") - temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True) - file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config") - file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config") - file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config") - num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config") - file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config") - queue_size = Cpt(EpicsSignalWithRBV, "QueueSize", kind="config") - array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config") - - -class FalconSetup(CustomDetectorMixin): - """ - Falcon setup class for cSAXS - - Parent class: CustomDetectorMixin - - """ - - def __init__(self, *args, parent: Device = None, **kwargs) -> None: - super().__init__(*args, parent=parent, **kwargs) - self._lock = threading.RLock() + file_event = Cpt(FileEventSignal, name="file_event") def on_init(self) -> None: - """Initialize Falcon detector""" - self.initialize_default_parameter() - self.initialize_detector() - self.initialize_detector_backend() + """Initialize Falcon Sitoro detector""" + self._lock = threading.RLock() + self._readout_time = self.MIN_READOUT + self._value_pixel_per_buffer = 20 + self._queue_size = 2000 + self._full_path = "" - def initialize_default_parameter(self) -> None: + def on_connected(self): """ - Set default parameters for Falcon - - This will set: - - readout (float): readout time in seconds - - value_pixel_per_buffer (int): number of spectra in buffer of Falcon Sitoro - + Setup Falcon Sitoro detector default parameters once signals are connected """ - self.parent.value_pixel_per_buffer = 20 - self.update_readout_time() - - def update_readout_time(self) -> None: - """Set readout time for Eiger9M detector""" - readout_time = ( - self.parent.scaninfo.readout_time - if hasattr(self.parent.scaninfo, "readout_time") - else self.parent.MIN_READOUT - ) - self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT) - - def initialize_detector(self) -> None: - """Initialize Falcon detector""" - self.stop_detector() - self.stop_detector_backend() + self._initialize_detector() + self._initialize_detector_backend() self.set_trigger( mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 ) - # 1 Realtime - self.parent.preset_mode.put(1) - # 0 Normal, 1 Inverted - self.parent.input_logic_polarity.put(0) - # 0 Manual 1 Auto - self.parent.auto_pixels_per_buffer.put(0) - # Sets the number of pixels/spectra in the buffer - self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer) - - def initialize_detector_backend(self) -> None: - """Initialize the detector backend for Falcon.""" - self.parent.hdf5.enable.put(1) - # file location of h5 layout for cSAXS - self.parent.hdf5.xml_file_name.put("layout.xml") - # TODO Check if lazy open is needed and wanted! - self.parent.hdf5.lazy_open.put(1) - self.parent.hdf5.temp_suffix.put("") - # size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost - self.parent.hdf5.queue_size.put(2000) - # Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate - self.parent.nd_array_mode.put(1) - - def on_stage(self) -> None: - """Prepare detector and backend for acquisition""" - self.prepare_detector() - self.prepare_data_backend() - self.publish_file_location(done=False, successful=False) - self.arm_acquisition() - - def prepare_detector(self) -> None: - """Prepare detector for acquisition""" - self.set_trigger( - mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 - ) - self.parent.preset_real.put(self.parent.scaninfo.exp_time) - self.parent.pixels_per_run.put( - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) - ) - - def prepare_data_backend(self) -> None: - """Prepare data backend for acquisition""" - self.parent.filepath.set( - self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5") - ).wait() - file_path, file_name = os.path.split(self.parent.filepath.get()) - self.parent.hdf5.file_path.put(file_path) - self.parent.hdf5.file_name.put(file_name) - self.parent.hdf5.file_template.put("%s%s") - self.parent.hdf5.num_capture.put( - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) - ) - self.parent.hdf5.file_write_mode.put(2) - # Reset spectrum counter in filewriter, used for indexing & identifying missing triggers - self.parent.hdf5.array_counter.put(0) - # Start file writing - self.parent.hdf5.capture.put(1) - - def arm_acquisition(self) -> None: - """Arm detector for acquisition""" - self.parent.start_all.put(1) - signal_conditions = [ - ( - lambda: self.parent.state.read()[self.parent.state.name]["value"], - DetectorState.ACQUIRING, - ) - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS, - check_stopped=True, - all_signals=False, - ): - raise FalconTimeoutError( - f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}" - ) - - def on_unstage(self) -> None: - """Unstage detector and backend""" - pass - - def on_complete(self) -> None: - """Complete detector and backend""" - self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS) - self.publish_file_location(done=True, successful=True) - - def on_stop(self) -> None: - """Stop detector and backend""" - self.stop_detector() - self.stop_detector_backend() - - def stop_detector(self) -> None: - """Stops detector""" - - self.parent.stop_all.put(1) - self.parent.erase_all.put(1) - - signal_conditions = [ - (lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE) - ] - - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2, - all_signals=False, - ): - # Retry stop detector and wait for remaining time - raise FalconTimeoutError( - f"Failed to stop detector, timeout with state {signal_conditions[0][0]}" - ) - - def stop_detector_backend(self) -> None: - """Stop the detector backend""" - self.parent.hdf5.capture.put(0) - - def finished(self, timeout: int = 5) -> None: - """Check if scan finished succesfully""" - with self._lock: - total_frames = int( - self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger - ) - signal_conditions = [ - (self.parent.dxp.current_pixel.get, total_frames), - (self.parent.hdf5.array_counter.get, total_frames), - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=timeout, - check_stopped=True, - all_signals=True, - ): - logger.debug( - f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()}," - f" send data {self.parent.hdf5.array_counter.get()} from total_frames" - f" {total_frames}" - ) - self.stop_detector() - self.stop_detector_backend() def set_trigger( - self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0 + self, + mapping_mode: MappingSource, + trigger_source: TriggerSource, + ignore_gate: Literal[0, 1] = 0, ) -> None: """ Set triggering mode for detector @@ -287,63 +102,144 @@ class FalconSetup(CustomDetectorMixin): """ mapping = int(mapping_mode) - trigger = trigger_source - self.parent.collect_mode.put(mapping) - self.parent.pixel_advance_mode.put(trigger) - self.parent.ignore_gate.put(ignore_gate) + trigger = int(trigger_source) + self.collect_mode.put(mapping) + self.pixel_advance_mode.put(trigger) + self.ignore_gate.put(ignore_gate) + def _initialize_detector(self) -> None: + """Initialize Falcon detector""" + self.stop_detector() + self.stop_detector_backend() + self.set_trigger( + mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 + ) -class FalconcSAXS(PSIDetectorBase): - """ - Falcon Sitoro detector for CSAXS + # 1 Realtime + self.preset_mode.put(1) - Parent class: PSIDetectorBase + # 0 Normal, 1 Inverted + self.input_logic_polarity.put(0) - class attributes: - custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS, - inherits from CustomDetectorMixin - PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector - dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector - mca (EpicsMCARecord) : MCA parameters for Falcon detector - hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector - MIN_READOUT (float) : Minimum readout time for the detector - """ + # 0 Manual 1 Auto + self.auto_pixels_per_buffer.put(0) - # Specify which functions are revealed to the user in BEC client - USER_ACCESS = ["describe"] + # Sets the number of pixels/spectra in the buffer + self.pixels_per_buffer.put(self._value_pixel_per_buffer) - # specify Setup class - custom_prepare_cls = FalconSetup - # specify minimum readout time for detector - MIN_READOUT = 3e-3 - TIMEOUT_FOR_SIGNALS = 5 + def _initialize_detector_backend(self) -> None: + """Initialize the detector backend for Falcon.""" + # Enable HDF5 plugin + self.hdf5.enable.put(1) - # specify class attributes - dxp = Cpt(EpicsDXPFalcon, "dxp1:") - mca = Cpt(EpicsMCARecord, "mca1") - hdf5 = Cpt(FalconHDF5Plugins, "HDF1:") + # Use layout.xml file for cSAXS Falcon. FIXME:Should be checked if IOC runs on different host. + self.hdf5.xml_file_name.put("layout.xml") - stop_all = Cpt(EpicsSignal, "StopAll") - erase_all = Cpt(EpicsSignal, "EraseAll") - start_all = Cpt(EpicsSignal, "StartAll") - state = Cpt(EpicsSignal, "Acquiring") - preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers - preset_real = Cpt(EpicsSignal, "PresetReal") - preset_events = Cpt(EpicsSignal, "PresetEvents") - preset_triggers = Cpt(EpicsSignal, "PresetTriggers") - triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True) - events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True) - input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True) - output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True) - collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping - pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode") - ignore_gate = Cpt(EpicsSignal, "IgnoreGate") - input_logic_polarity = Cpt(EpicsSignal, "InputLogicPolarity") - auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer") - pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer") - pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun") - nd_array_mode = Cpt(EpicsSignal, "NDArrayMode") + # TODO Check if lazy open is needed and wanted! + self.hdf5.lazy_open.put(1) + self.hdf5.temp_suffix.put("") + + # Size of the queue for the number of spectra allowed in the buffer. If too small, data is lost at high throughput + self.hdf5.queue_size.put(self._queue_size) + + # Set nd_array mode to 1: This means segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate + self.nd_array_mode.put(1) + + def on_stage(self): + """ + This method is called when the detector is staged for acquisition. + We use the information in scan_info.msg about the upcoming scan to set all relevant parameters on the detector. + """ + # Calculate relevant parameters + num_points = self.scan_info.msg.num_points + frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) + overall_frames = int(num_points * frames_per_trigger) + exp_time = self.scan_info.exp_time + self._full_path = get_full_path(self.scan_info.msg, self.name) + + # Check that exposure time is larger than readout time + readout_time = max( + self.scan_info.msg.scan_parameters.get("readout_time", self.MIN_READOUT), + self.MIN_READOUT, + ) + if exp_time < readout_time: + raise ValueError( + f"Exposure time {exp_time} is less than minimum readout time {readout_time}" + ) + + # TODO: Add h5_entries for linking the Falcon NEXUS entries with the master file + self.file_event.put(file_path=self._full_path, done=False, successful=False) + + self.preset_real.put(exp_time) + self.pixels_per_run.put(overall_frames) + + # Prepare detector backend PVs + file_path, file_name = os.path.split(self._full_path) + self.hdf5.file_path.put(file_path) + self.hdf5.file_name.put(file_name) + self.hdf5.file_template.put("%s%s") + self.hdf5.num_capture.put(overall_frames) + self.hdf5.file_write_mode.put(2) + # Reset spectrum counter in filewriter, used for indexing & identifying missing triggers + self.hdf5.array_counter.put(0) + + # Start file writing + self.hdf5.capture.put(1) + # Start the acquisition + self.start_all.put(1) + + def on_pre_scan(self): + """ + Method for actions just before the scan starts. + """ + status_camera = CompareStatus( + self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout + ) + status_writer = CompareStatus( + self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout + ) + # Logical combine of statuses + status = status_camera & status_writer + self.cancel_on_stop(status) + return status + + def _complete_callback(self, status: CompareStatus) -> None: + """Callback for when the device completes a scan.""" + # FIXME Add proper h5 entries once checked + if status.success: + self.file_event.put( + file_path=self._full_path, # pylint: disable:protected-access + done=True, + successful=True, + ) + else: + self.file_event.put( + file_path=self._full_path, # pylint: disable:protected-access + done=True, + successful=False, + ) + + def on_complete(self) -> None: + """Complete detector and backend""" + # Calculate relevant parameters + num_points = self.scan_info.msg.num_points + frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) + overall_frames = int(num_points * frames_per_trigger) + + status_detector = CompareStatus(self.dxp.current_pixel, overall_frames, run=True) + status_backend = CompareStatus(self.hdf5.array_counter, overall_frames, run=True) + + status = status_detector & status_backend + self.cancel_on_stop(status) + status.add_callback(self._complete_callback) + return status + + def on_stop(self) -> None: + """Stop detector and backend""" + self.stop_all.put(1) + self.hdf5.capture.put(0) + self.erase_all.put(1) if __name__ == "__main__": - falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True) + falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:") diff --git a/tests/tests_devices/test_falcon_csaxs.py b/tests/tests_devices/test_falcon_csaxs.py index cc1f537..18a7b8a 100644 --- a/tests/tests_devices/test_falcon_csaxs.py +++ b/tests/tests_devices/test_falcon_csaxs.py @@ -1,6 +1,7 @@ # pylint: skip-file import os import threading +from typing import Generator from unittest import mock import ophyd @@ -8,80 +9,74 @@ import pytest from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_server.device_server.tests.utils import DMMock -from ophyd_devices.tests.utils import MockPV +from ophyd_devices.tests.utils import patched_device -from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS, FalconTimeoutError -from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs +from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS @pytest.fixture(scope="function") -def mock_det(): - name = "falcon" - prefix = "X12SA-SITORO:" +def mock_det() -> Generator[FalconcSAXS, None, None]: + """Fixture to mock the FalconcSAXS device.""" + name = "mcs_csaxs" + prefix = "X12SA-MCS-CSAXS:" dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch( - "ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter" - ) as filemixin, - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ) as mock_service_config, - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - with mock.patch.object(FalconcSAXS, "_init"): - det = FalconcSAXS(name=name, prefix=prefix, device_manager=dm) - patch_dual_pvs(det) - det.TIMEOUT_FOR_SIGNALS = 0.1 - yield det + with patched_device( + FalconcSAXS, + name="falcon", + prefix="X12SA-SITORO:", + device_manager=dm, + _mock_pv_initial_value=0, + ) as dev: + try: + yield dev + finally: + dev.destroy() -@pytest.mark.parametrize( - "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state," - " expected_exception", - [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)], -) -# TODO rewrite this one, write test for init_detector, init_filewriter is tested -def test_init_detector( - mock_det, - trigger_source, - mapping_source, - ignore_gate, - pixels_per_buffer, - detector_state, - expected_exception, -): - """Test the _init function: +# @pytest.mark.parametrize( +# "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state," +# " expected_exception", +# [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)], +# ) +# # TODO rewrite this one, write test for init_detector, init_filewriter is tested +# def test_init_detector( +# mock_det, +# trigger_source, +# mapping_source, +# ignore_gate, +# pixels_per_buffer, +# detector_state, +# expected_exception, +# ): +# """Test the _init function: - This includes testing the functions: - - _init_detector - - _stop_det - - _set_trigger - --> Testing the filewriter is done in test_init_filewriter +# This includes testing the functions: +# - _init_detector +# - _stop_det +# - _set_trigger +# --> Testing the filewriter is done in test_init_filewriter - Validation upon setting the correct PVs +# Validation upon setting the correct PVs - """ - mock_det.value_pixel_per_buffer = pixels_per_buffer - mock_det.state._read_pv.mock_data = detector_state - if expected_exception: - with pytest.raises(FalconTimeoutError): - mock_det.timeout = 0.1 - mock_det.custom_prepare.initialize_detector() - else: - mock_det.custom_prepare.initialize_detector() - assert mock_det.state.get() == detector_state - assert mock_det.collect_mode.get() == mapping_source - assert mock_det.pixel_advance_mode.get() == trigger_source - assert mock_det.ignore_gate.get() == ignore_gate +# """ +# mock_det.value_pixel_per_buffer = pixels_per_buffer +# mock_det.state._read_pv.mock_data = detector_state +# if expected_exception: +# with pytest.raises(FalconTimeoutError): +# mock_det.timeout = 0.1 +# mock_det.custom_prepare.initialize_detector() +# else: +# mock_det.custom_prepare.initialize_detector() +# assert mock_det.state.get() == detector_state +# assert mock_det.collect_mode.get() == mapping_source +# assert mock_det.pixel_advance_mode.get() == trigger_source +# assert mock_det.ignore_gate.get() == ignore_gate - assert mock_det.preset_mode.get() == 1 - assert mock_det.erase_all.get() == 1 - assert mock_det.input_logic_polarity.get() == 0 - assert mock_det.auto_pixels_per_buffer.get() == 0 - assert mock_det.pixels_per_buffer.get() == pixels_per_buffer +# assert mock_det.preset_mode.get() == 1 +# assert mock_det.erase_all.get() == 1 +# assert mock_det.input_logic_polarity.get() == 0 +# assert mock_det.auto_pixels_per_buffer.get() == 0 +# assert mock_det.pixels_per_buffer.get() == pixels_per_buffer @pytest.mark.parametrize( @@ -89,12 +84,12 @@ def test_init_detector( ) def test_update_readout_time(mock_det, readout_time, expected_value): if readout_time is None: - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value + mock_det._update_readout_time() + assert mock_det._readout_time == expected_value else: - mock_det.scaninfo.readout_time = readout_time - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value + mock_det.scan_info.readout_time = readout_time + mock_det._update_readout_time() + assert mock_det._readout_time == expected_value def test_initialize_default_parameter(mock_det): @@ -109,17 +104,15 @@ def test_initialize_default_parameter(mock_det): @pytest.mark.parametrize( "scaninfo", [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 0.1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - } - ) + { + "eacc": "e12345", + "num_points": 500, + "frames_per_trigger": 1, + "exp_time": 0.1, + "filepath": "test.h5", + "scan_id": "123", + "mokev": 12.4, + } ], ) def test_stage(mock_det, scaninfo):