diff --git a/ophyd_devices/epics/devices/falcon_csaxs.py b/ophyd_devices/epics/devices/falcon_csaxs.py index 4a5bd1b..510759e 100644 --- a/ophyd_devices/epics/devices/falcon_csaxs.py +++ b/ophyd_devices/epics/devices/falcon_csaxs.py @@ -5,8 +5,11 @@ import time from typing import List from bec_lib.core.devicemanager import DeviceStatus -from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt, Device +from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt from ophyd.mca import EpicsMCARecord +from ophyd import DetectorBase, Device +from ophyd import ADComponent as ADCpt + from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core import MessageEndpoints, BECMessage @@ -17,6 +20,8 @@ from ophyd_devices.utils import bec_utils logger = bec_logger.logger +FALCON_MIN_READOUT = 3e-3 + class FalconError(Exception): """Base class for exceptions in this module.""" @@ -24,12 +29,19 @@ class FalconError(Exception): pass -class FalconTimeoutError(Exception): +class FalconTimeoutError(FalconError): """Raised when the Falcon does not respond in time during unstage.""" pass +class DeviceClassInitError(FalconError): + """Raised when initiation of the device class fails, + due to missing device manager or not started in sim_mode.""" + + pass + + class DetectorState(int, enum.Enum): """Detector states for Falcon detector""" @@ -160,30 +172,46 @@ class FalconCsaxs(Device): **kwargs, ) if device_manager is None and not sim_mode: - raise FalconError("Add DeviceManager to initialization or init with sim_mode=True") + raise DeviceClassInitError( + f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True" + ) + self.sim_mode = sim_mode self._stopped = False self.name = name - self.wait_for_connection() - # Spin up connections for simulation or BEC mode + self.service_cfg = None + self.scaninfo = None + self.filewriter = None + self.readout_time_min = FALCON_MIN_READOUT + self._value_pixel_per_buffer = None + self.readout_time = None + self.wait_for_connection(all_signals=True) if not sim_mode: - from bec_lib.core.bec_service import SERVICE_CONFIG - + self._update_service_config() self.device_manager = device_manager - self._producer = self.device_manager.producer - self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] else: - base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/" - self._producer = bec_utils.MockProducer() - self.device_manager = bec_utils.MockDeviceManager() - self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) - self.scaninfo.load_scan_metadata() - self.service_cfg = {"base_path": base_path} - - self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) - self.scaninfo.load_scan_metadata() - self.filewriter = FileWriterMixin(self.service_cfg) + self.device_manager = bec_utils.DMMock() + base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/" + self.service_cfg = {"base_path": os.path.expanduser(base_path)} + self._producer = self.device_manager.producer + self._update_scaninfo() + self._update_filewriter() self._init() + def _update_filewriter(self) -> None: + """Update filewriter with service config""" + self.filewriter = FileWriterMixin(self.service_cfg) + + def _update_scaninfo(self) -> None: + """Update scaninfo from BecScaninfoMixing + This depends on device manager and operation/sim_mode + """ + self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) + self.scaninfo.load_scan_metadata() + + def _update_service_config(self) -> None: + """Update service config from BEC service config""" + self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] + def _init(self) -> None: """Initialize detector, filewriter and set default parameters""" self._default_parameter()