wip falcon
This commit is contained in:
@@ -1,15 +1,17 @@
|
||||
"""Falcon Sitoro detector class for cSAXS beamline."""
|
||||
|
||||
import enum
|
||||
import os
|
||||
import threading
|
||||
from typing import Literal
|
||||
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
|
||||
from ophyd.mca import EpicsMCARecord
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin,
|
||||
PSIDetectorBase,
|
||||
)
|
||||
from ophyd_devices import CompareStatus, FileEventSignal
|
||||
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35 as HDF5Plugin
|
||||
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -18,15 +20,11 @@ class FalconError(Exception):
|
||||
"""Base class for exceptions in this module."""
|
||||
|
||||
|
||||
class FalconTimeoutError(FalconError):
|
||||
"""Raised when the Falcon does not respond in time."""
|
||||
|
||||
|
||||
class DetectorState(enum.IntEnum):
|
||||
class ACQUIRESTATUS(enum.IntEnum):
|
||||
"""Detector states for Falcon detector"""
|
||||
|
||||
DONE = 0
|
||||
ACQUIRING = 1
|
||||
ACQUIRING = 1 # or Capturing
|
||||
|
||||
|
||||
class TriggerSource(enum.IntEnum):
|
||||
@@ -44,238 +42,55 @@ class MappingSource(enum.IntEnum):
|
||||
MAPPING = 1
|
||||
|
||||
|
||||
class EpicsDXPFalcon(Device):
|
||||
"""
|
||||
DXP parameters for Falcon detector
|
||||
class FalconControl(Falcon):
|
||||
"""Falcon Control class at cSAXS. prefix: 'X12SA-SITORO:'"""
|
||||
|
||||
Base class to map EPICS PVs from DXP parameters to ophyd signals.
|
||||
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
|
||||
mca = Cpt(EpicsMCARecord, "mca1")
|
||||
hdf5 = Cpt(HDF5Plugin, "HDF1:")
|
||||
|
||||
|
||||
class FalconcSAXS(PSIDeviceBase, FalconControl):
|
||||
"""
|
||||
Falcon Sitoro detector for CSAXS
|
||||
|
||||
|
||||
class attributes:
|
||||
dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector
|
||||
mca (EpicsMCARecord) : MCA parameters for Falcon detector
|
||||
hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector
|
||||
MIN_READOUT (float) : Minimum readout time for the detector
|
||||
"""
|
||||
|
||||
elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime")
|
||||
elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime")
|
||||
elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime")
|
||||
# specify minimum readout time for detector
|
||||
MIN_READOUT = 3e-3
|
||||
_pv_timeout = 3 # Timeout for PV operations in seconds
|
||||
|
||||
# Energy Filter PVs
|
||||
energy_threshold = Cpt(EpicsSignalWithRBV, "DetectionThreshold")
|
||||
min_pulse_separation = Cpt(EpicsSignalWithRBV, "MinPulsePairSeparation")
|
||||
detection_filter = Cpt(EpicsSignalWithRBV, "DetectionFilter", string=True)
|
||||
scale_factor = Cpt(EpicsSignalWithRBV, "ScaleFactor")
|
||||
risetime_optimisation = Cpt(EpicsSignalWithRBV, "RisetimeOptimization")
|
||||
|
||||
# Misc PVs
|
||||
detector_polarity = Cpt(EpicsSignalWithRBV, "DetectorPolarity")
|
||||
decay_time = Cpt(EpicsSignalWithRBV, "DecayTime")
|
||||
|
||||
current_pixel = Cpt(EpicsSignalRO, "CurrentPixel")
|
||||
|
||||
|
||||
class FalconHDF5Plugins(Device):
|
||||
"""
|
||||
HDF5 parameters for Falcon detector
|
||||
|
||||
Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.
|
||||
"""
|
||||
|
||||
capture = Cpt(EpicsSignalWithRBV, "Capture")
|
||||
enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config")
|
||||
xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config")
|
||||
lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'")
|
||||
temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True)
|
||||
file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config")
|
||||
file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config")
|
||||
file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config")
|
||||
num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config")
|
||||
file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config")
|
||||
queue_size = Cpt(EpicsSignalWithRBV, "QueueSize", kind="config")
|
||||
array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config")
|
||||
|
||||
|
||||
class FalconSetup(CustomDetectorMixin):
|
||||
"""
|
||||
Falcon setup class for cSAXS
|
||||
|
||||
Parent class: CustomDetectorMixin
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
|
||||
super().__init__(*args, parent=parent, **kwargs)
|
||||
self._lock = threading.RLock()
|
||||
file_event = Cpt(FileEventSignal, name="file_event")
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""Initialize Falcon detector"""
|
||||
self.initialize_default_parameter()
|
||||
self.initialize_detector()
|
||||
self.initialize_detector_backend()
|
||||
"""Initialize Falcon Sitoro detector"""
|
||||
self._lock = threading.RLock()
|
||||
self._readout_time = self.MIN_READOUT
|
||||
self._value_pixel_per_buffer = 20
|
||||
self._queue_size = 2000
|
||||
self._full_path = ""
|
||||
|
||||
def initialize_default_parameter(self) -> None:
|
||||
def on_connected(self):
|
||||
"""
|
||||
Set default parameters for Falcon
|
||||
|
||||
This will set:
|
||||
- readout (float): readout time in seconds
|
||||
- value_pixel_per_buffer (int): number of spectra in buffer of Falcon Sitoro
|
||||
|
||||
Setup Falcon Sitoro detector default parameters once signals are connected
|
||||
"""
|
||||
self.parent.value_pixel_per_buffer = 20
|
||||
self.update_readout_time()
|
||||
|
||||
def update_readout_time(self) -> None:
|
||||
"""Set readout time for Eiger9M detector"""
|
||||
readout_time = (
|
||||
self.parent.scaninfo.readout_time
|
||||
if hasattr(self.parent.scaninfo, "readout_time")
|
||||
else self.parent.MIN_READOUT
|
||||
)
|
||||
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
|
||||
|
||||
def initialize_detector(self) -> None:
|
||||
"""Initialize Falcon detector"""
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
self._initialize_detector()
|
||||
self._initialize_detector_backend()
|
||||
self.set_trigger(
|
||||
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
|
||||
)
|
||||
# 1 Realtime
|
||||
self.parent.preset_mode.put(1)
|
||||
# 0 Normal, 1 Inverted
|
||||
self.parent.input_logic_polarity.put(0)
|
||||
# 0 Manual 1 Auto
|
||||
self.parent.auto_pixels_per_buffer.put(0)
|
||||
# Sets the number of pixels/spectra in the buffer
|
||||
self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer)
|
||||
|
||||
def initialize_detector_backend(self) -> None:
|
||||
"""Initialize the detector backend for Falcon."""
|
||||
self.parent.hdf5.enable.put(1)
|
||||
# file location of h5 layout for cSAXS
|
||||
self.parent.hdf5.xml_file_name.put("layout.xml")
|
||||
# TODO Check if lazy open is needed and wanted!
|
||||
self.parent.hdf5.lazy_open.put(1)
|
||||
self.parent.hdf5.temp_suffix.put("")
|
||||
# size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost
|
||||
self.parent.hdf5.queue_size.put(2000)
|
||||
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
|
||||
self.parent.nd_array_mode.put(1)
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Prepare detector and backend for acquisition"""
|
||||
self.prepare_detector()
|
||||
self.prepare_data_backend()
|
||||
self.publish_file_location(done=False, successful=False)
|
||||
self.arm_acquisition()
|
||||
|
||||
def prepare_detector(self) -> None:
|
||||
"""Prepare detector for acquisition"""
|
||||
self.set_trigger(
|
||||
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
|
||||
)
|
||||
self.parent.preset_real.put(self.parent.scaninfo.exp_time)
|
||||
self.parent.pixels_per_run.put(
|
||||
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
|
||||
)
|
||||
|
||||
def prepare_data_backend(self) -> None:
|
||||
"""Prepare data backend for acquisition"""
|
||||
self.parent.filepath.set(
|
||||
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
|
||||
).wait()
|
||||
file_path, file_name = os.path.split(self.parent.filepath.get())
|
||||
self.parent.hdf5.file_path.put(file_path)
|
||||
self.parent.hdf5.file_name.put(file_name)
|
||||
self.parent.hdf5.file_template.put("%s%s")
|
||||
self.parent.hdf5.num_capture.put(
|
||||
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
|
||||
)
|
||||
self.parent.hdf5.file_write_mode.put(2)
|
||||
# Reset spectrum counter in filewriter, used for indexing & identifying missing triggers
|
||||
self.parent.hdf5.array_counter.put(0)
|
||||
# Start file writing
|
||||
self.parent.hdf5.capture.put(1)
|
||||
|
||||
def arm_acquisition(self) -> None:
|
||||
"""Arm detector for acquisition"""
|
||||
self.parent.start_all.put(1)
|
||||
signal_conditions = [
|
||||
(
|
||||
lambda: self.parent.state.read()[self.parent.state.name]["value"],
|
||||
DetectorState.ACQUIRING,
|
||||
)
|
||||
]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
|
||||
check_stopped=True,
|
||||
all_signals=False,
|
||||
):
|
||||
raise FalconTimeoutError(
|
||||
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
|
||||
)
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""Unstage detector and backend"""
|
||||
pass
|
||||
|
||||
def on_complete(self) -> None:
|
||||
"""Complete detector and backend"""
|
||||
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
|
||||
self.publish_file_location(done=True, successful=True)
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop detector and backend"""
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
|
||||
def stop_detector(self) -> None:
|
||||
"""Stops detector"""
|
||||
|
||||
self.parent.stop_all.put(1)
|
||||
self.parent.erase_all.put(1)
|
||||
|
||||
signal_conditions = [
|
||||
(lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
|
||||
]
|
||||
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
|
||||
all_signals=False,
|
||||
):
|
||||
# Retry stop detector and wait for remaining time
|
||||
raise FalconTimeoutError(
|
||||
f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
|
||||
)
|
||||
|
||||
def stop_detector_backend(self) -> None:
|
||||
"""Stop the detector backend"""
|
||||
self.parent.hdf5.capture.put(0)
|
||||
|
||||
def finished(self, timeout: int = 5) -> None:
|
||||
"""Check if scan finished succesfully"""
|
||||
with self._lock:
|
||||
total_frames = int(
|
||||
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
|
||||
)
|
||||
signal_conditions = [
|
||||
(self.parent.dxp.current_pixel.get, total_frames),
|
||||
(self.parent.hdf5.array_counter.get, total_frames),
|
||||
]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=timeout,
|
||||
check_stopped=True,
|
||||
all_signals=True,
|
||||
):
|
||||
logger.debug(
|
||||
f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
|
||||
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
|
||||
f" {total_frames}"
|
||||
)
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
|
||||
def set_trigger(
|
||||
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
|
||||
self,
|
||||
mapping_mode: MappingSource,
|
||||
trigger_source: TriggerSource,
|
||||
ignore_gate: Literal[0, 1] = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Set triggering mode for detector
|
||||
@@ -287,63 +102,144 @@ class FalconSetup(CustomDetectorMixin):
|
||||
|
||||
"""
|
||||
mapping = int(mapping_mode)
|
||||
trigger = trigger_source
|
||||
self.parent.collect_mode.put(mapping)
|
||||
self.parent.pixel_advance_mode.put(trigger)
|
||||
self.parent.ignore_gate.put(ignore_gate)
|
||||
trigger = int(trigger_source)
|
||||
self.collect_mode.put(mapping)
|
||||
self.pixel_advance_mode.put(trigger)
|
||||
self.ignore_gate.put(ignore_gate)
|
||||
|
||||
def _initialize_detector(self) -> None:
|
||||
"""Initialize Falcon detector"""
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
self.set_trigger(
|
||||
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
|
||||
)
|
||||
|
||||
class FalconcSAXS(PSIDetectorBase):
|
||||
"""
|
||||
Falcon Sitoro detector for CSAXS
|
||||
# 1 Realtime
|
||||
self.preset_mode.put(1)
|
||||
|
||||
Parent class: PSIDetectorBase
|
||||
# 0 Normal, 1 Inverted
|
||||
self.input_logic_polarity.put(0)
|
||||
|
||||
class attributes:
|
||||
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
|
||||
inherits from CustomDetectorMixin
|
||||
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
|
||||
dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector
|
||||
mca (EpicsMCARecord) : MCA parameters for Falcon detector
|
||||
hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector
|
||||
MIN_READOUT (float) : Minimum readout time for the detector
|
||||
"""
|
||||
# 0 Manual 1 Auto
|
||||
self.auto_pixels_per_buffer.put(0)
|
||||
|
||||
# Specify which functions are revealed to the user in BEC client
|
||||
USER_ACCESS = ["describe"]
|
||||
# Sets the number of pixels/spectra in the buffer
|
||||
self.pixels_per_buffer.put(self._value_pixel_per_buffer)
|
||||
|
||||
# specify Setup class
|
||||
custom_prepare_cls = FalconSetup
|
||||
# specify minimum readout time for detector
|
||||
MIN_READOUT = 3e-3
|
||||
TIMEOUT_FOR_SIGNALS = 5
|
||||
def _initialize_detector_backend(self) -> None:
|
||||
"""Initialize the detector backend for Falcon."""
|
||||
# Enable HDF5 plugin
|
||||
self.hdf5.enable.put(1)
|
||||
|
||||
# specify class attributes
|
||||
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
|
||||
mca = Cpt(EpicsMCARecord, "mca1")
|
||||
hdf5 = Cpt(FalconHDF5Plugins, "HDF1:")
|
||||
# Use layout.xml file for cSAXS Falcon. FIXME:Should be checked if IOC runs on different host.
|
||||
self.hdf5.xml_file_name.put("layout.xml")
|
||||
|
||||
stop_all = Cpt(EpicsSignal, "StopAll")
|
||||
erase_all = Cpt(EpicsSignal, "EraseAll")
|
||||
start_all = Cpt(EpicsSignal, "StartAll")
|
||||
state = Cpt(EpicsSignal, "Acquiring")
|
||||
preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers
|
||||
preset_real = Cpt(EpicsSignal, "PresetReal")
|
||||
preset_events = Cpt(EpicsSignal, "PresetEvents")
|
||||
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
|
||||
triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True)
|
||||
events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True)
|
||||
input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True)
|
||||
output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True)
|
||||
collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping
|
||||
pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode")
|
||||
ignore_gate = Cpt(EpicsSignal, "IgnoreGate")
|
||||
input_logic_polarity = Cpt(EpicsSignal, "InputLogicPolarity")
|
||||
auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer")
|
||||
pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer")
|
||||
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
|
||||
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
|
||||
# TODO Check if lazy open is needed and wanted!
|
||||
self.hdf5.lazy_open.put(1)
|
||||
self.hdf5.temp_suffix.put("")
|
||||
|
||||
# Size of the queue for the number of spectra allowed in the buffer. If too small, data is lost at high throughput
|
||||
self.hdf5.queue_size.put(self._queue_size)
|
||||
|
||||
# Set nd_array mode to 1: This means segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
|
||||
self.nd_array_mode.put(1)
|
||||
|
||||
def on_stage(self):
|
||||
"""
|
||||
This method is called when the detector is staged for acquisition.
|
||||
We use the information in scan_info.msg about the upcoming scan to set all relevant parameters on the detector.
|
||||
"""
|
||||
# Calculate relevant parameters
|
||||
num_points = self.scan_info.msg.num_points
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
overall_frames = int(num_points * frames_per_trigger)
|
||||
exp_time = self.scan_info.exp_time
|
||||
self._full_path = get_full_path(self.scan_info.msg, self.name)
|
||||
|
||||
# Check that exposure time is larger than readout time
|
||||
readout_time = max(
|
||||
self.scan_info.msg.scan_parameters.get("readout_time", self.MIN_READOUT),
|
||||
self.MIN_READOUT,
|
||||
)
|
||||
if exp_time < readout_time:
|
||||
raise ValueError(
|
||||
f"Exposure time {exp_time} is less than minimum readout time {readout_time}"
|
||||
)
|
||||
|
||||
# TODO: Add h5_entries for linking the Falcon NEXUS entries with the master file
|
||||
self.file_event.put(file_path=self._full_path, done=False, successful=False)
|
||||
|
||||
self.preset_real.put(exp_time)
|
||||
self.pixels_per_run.put(overall_frames)
|
||||
|
||||
# Prepare detector backend PVs
|
||||
file_path, file_name = os.path.split(self._full_path)
|
||||
self.hdf5.file_path.put(file_path)
|
||||
self.hdf5.file_name.put(file_name)
|
||||
self.hdf5.file_template.put("%s%s")
|
||||
self.hdf5.num_capture.put(overall_frames)
|
||||
self.hdf5.file_write_mode.put(2)
|
||||
# Reset spectrum counter in filewriter, used for indexing & identifying missing triggers
|
||||
self.hdf5.array_counter.put(0)
|
||||
|
||||
# Start file writing
|
||||
self.hdf5.capture.put(1)
|
||||
# Start the acquisition
|
||||
self.start_all.put(1)
|
||||
|
||||
def on_pre_scan(self):
|
||||
"""
|
||||
Method for actions just before the scan starts.
|
||||
"""
|
||||
status_camera = CompareStatus(
|
||||
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
|
||||
)
|
||||
status_writer = CompareStatus(
|
||||
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
|
||||
)
|
||||
# Logical combine of statuses
|
||||
status = status_camera & status_writer
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def _complete_callback(self, status: CompareStatus) -> None:
|
||||
"""Callback for when the device completes a scan."""
|
||||
# FIXME Add proper h5 entries once checked
|
||||
if status.success:
|
||||
self.file_event.put(
|
||||
file_path=self._full_path, # pylint: disable:protected-access
|
||||
done=True,
|
||||
successful=True,
|
||||
)
|
||||
else:
|
||||
self.file_event.put(
|
||||
file_path=self._full_path, # pylint: disable:protected-access
|
||||
done=True,
|
||||
successful=False,
|
||||
)
|
||||
|
||||
def on_complete(self) -> None:
|
||||
"""Complete detector and backend"""
|
||||
# Calculate relevant parameters
|
||||
num_points = self.scan_info.msg.num_points
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
overall_frames = int(num_points * frames_per_trigger)
|
||||
|
||||
status_detector = CompareStatus(self.dxp.current_pixel, overall_frames, run=True)
|
||||
status_backend = CompareStatus(self.hdf5.array_counter, overall_frames, run=True)
|
||||
|
||||
status = status_detector & status_backend
|
||||
self.cancel_on_stop(status)
|
||||
status.add_callback(self._complete_callback)
|
||||
return status
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop detector and backend"""
|
||||
self.stop_all.put(1)
|
||||
self.hdf5.capture.put(0)
|
||||
self.erase_all.put(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True)
|
||||
falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:")
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
# pylint: skip-file
|
||||
import os
|
||||
import threading
|
||||
from typing import Generator
|
||||
from unittest import mock
|
||||
|
||||
import ophyd
|
||||
@@ -8,80 +9,74 @@ import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
from ophyd_devices.tests.utils import patched_device
|
||||
|
||||
from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS, FalconTimeoutError
|
||||
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
|
||||
from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_det():
|
||||
name = "falcon"
|
||||
prefix = "X12SA-SITORO:"
|
||||
def mock_det() -> Generator[FalconcSAXS, None, None]:
|
||||
"""Fixture to mock the FalconcSAXS device."""
|
||||
name = "mcs_csaxs"
|
||||
prefix = "X12SA-MCS-CSAXS:"
|
||||
dm = DMMock()
|
||||
with mock.patch.object(dm, "connector"):
|
||||
with (
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"
|
||||
) as filemixin,
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
||||
) as mock_service_config,
|
||||
):
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
with mock.patch.object(FalconcSAXS, "_init"):
|
||||
det = FalconcSAXS(name=name, prefix=prefix, device_manager=dm)
|
||||
patch_dual_pvs(det)
|
||||
det.TIMEOUT_FOR_SIGNALS = 0.1
|
||||
yield det
|
||||
with patched_device(
|
||||
FalconcSAXS,
|
||||
name="falcon",
|
||||
prefix="X12SA-SITORO:",
|
||||
device_manager=dm,
|
||||
_mock_pv_initial_value=0,
|
||||
) as dev:
|
||||
try:
|
||||
yield dev
|
||||
finally:
|
||||
dev.destroy()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state,"
|
||||
" expected_exception",
|
||||
[(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)],
|
||||
)
|
||||
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
||||
def test_init_detector(
|
||||
mock_det,
|
||||
trigger_source,
|
||||
mapping_source,
|
||||
ignore_gate,
|
||||
pixels_per_buffer,
|
||||
detector_state,
|
||||
expected_exception,
|
||||
):
|
||||
"""Test the _init function:
|
||||
# @pytest.mark.parametrize(
|
||||
# "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state,"
|
||||
# " expected_exception",
|
||||
# [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)],
|
||||
# )
|
||||
# # TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
||||
# def test_init_detector(
|
||||
# mock_det,
|
||||
# trigger_source,
|
||||
# mapping_source,
|
||||
# ignore_gate,
|
||||
# pixels_per_buffer,
|
||||
# detector_state,
|
||||
# expected_exception,
|
||||
# ):
|
||||
# """Test the _init function:
|
||||
|
||||
This includes testing the functions:
|
||||
- _init_detector
|
||||
- _stop_det
|
||||
- _set_trigger
|
||||
--> Testing the filewriter is done in test_init_filewriter
|
||||
# This includes testing the functions:
|
||||
# - _init_detector
|
||||
# - _stop_det
|
||||
# - _set_trigger
|
||||
# --> Testing the filewriter is done in test_init_filewriter
|
||||
|
||||
Validation upon setting the correct PVs
|
||||
# Validation upon setting the correct PVs
|
||||
|
||||
"""
|
||||
mock_det.value_pixel_per_buffer = pixels_per_buffer
|
||||
mock_det.state._read_pv.mock_data = detector_state
|
||||
if expected_exception:
|
||||
with pytest.raises(FalconTimeoutError):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.initialize_detector()
|
||||
else:
|
||||
mock_det.custom_prepare.initialize_detector()
|
||||
assert mock_det.state.get() == detector_state
|
||||
assert mock_det.collect_mode.get() == mapping_source
|
||||
assert mock_det.pixel_advance_mode.get() == trigger_source
|
||||
assert mock_det.ignore_gate.get() == ignore_gate
|
||||
# """
|
||||
# mock_det.value_pixel_per_buffer = pixels_per_buffer
|
||||
# mock_det.state._read_pv.mock_data = detector_state
|
||||
# if expected_exception:
|
||||
# with pytest.raises(FalconTimeoutError):
|
||||
# mock_det.timeout = 0.1
|
||||
# mock_det.custom_prepare.initialize_detector()
|
||||
# else:
|
||||
# mock_det.custom_prepare.initialize_detector()
|
||||
# assert mock_det.state.get() == detector_state
|
||||
# assert mock_det.collect_mode.get() == mapping_source
|
||||
# assert mock_det.pixel_advance_mode.get() == trigger_source
|
||||
# assert mock_det.ignore_gate.get() == ignore_gate
|
||||
|
||||
assert mock_det.preset_mode.get() == 1
|
||||
assert mock_det.erase_all.get() == 1
|
||||
assert mock_det.input_logic_polarity.get() == 0
|
||||
assert mock_det.auto_pixels_per_buffer.get() == 0
|
||||
assert mock_det.pixels_per_buffer.get() == pixels_per_buffer
|
||||
# assert mock_det.preset_mode.get() == 1
|
||||
# assert mock_det.erase_all.get() == 1
|
||||
# assert mock_det.input_logic_polarity.get() == 0
|
||||
# assert mock_det.auto_pixels_per_buffer.get() == 0
|
||||
# assert mock_det.pixels_per_buffer.get() == pixels_per_buffer
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -89,12 +84,12 @@ def test_init_detector(
|
||||
)
|
||||
def test_update_readout_time(mock_det, readout_time, expected_value):
|
||||
if readout_time is None:
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
mock_det._update_readout_time()
|
||||
assert mock_det._readout_time == expected_value
|
||||
else:
|
||||
mock_det.scaninfo.readout_time = readout_time
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
mock_det.scan_info.readout_time = readout_time
|
||||
mock_det._update_readout_time()
|
||||
assert mock_det._readout_time == expected_value
|
||||
|
||||
|
||||
def test_initialize_default_parameter(mock_det):
|
||||
@@ -109,17 +104,15 @@ def test_initialize_default_parameter(mock_det):
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo",
|
||||
[
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"exp_time": 0.1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
}
|
||||
)
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"exp_time": 0.1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
}
|
||||
],
|
||||
)
|
||||
def test_stage(mock_det, scaninfo):
|
||||
|
||||
Reference in New Issue
Block a user