refactor: refactore falcon __init__

This commit is contained in:
appel_c 2023-11-09 09:50:16 +01:00
parent 793796638a
commit 38db08c877

View File

@ -5,8 +5,11 @@ import time
from typing import List
from bec_lib.core.devicemanager import DeviceStatus
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt, Device
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt
from ophyd.mca import EpicsMCARecord
from ophyd import DetectorBase, Device
from ophyd import ADComponent as ADCpt
from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import MessageEndpoints, BECMessage
@ -17,6 +20,8 @@ from ophyd_devices.utils import bec_utils
logger = bec_logger.logger
FALCON_MIN_READOUT = 3e-3
class FalconError(Exception):
"""Base class for exceptions in this module."""
@ -24,12 +29,19 @@ class FalconError(Exception):
pass
class FalconTimeoutError(Exception):
class FalconTimeoutError(FalconError):
"""Raised when the Falcon does not respond in time during unstage."""
pass
class DeviceClassInitError(FalconError):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
pass
class DetectorState(int, enum.Enum):
"""Detector states for Falcon detector"""
@ -160,30 +172,46 @@ class FalconCsaxs(Device):
**kwargs,
)
if device_manager is None and not sim_mode:
raise FalconError("Add DeviceManager to initialization or init with sim_mode=True")
raise DeviceClassInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True"
)
self.sim_mode = sim_mode
self._stopped = False
self.name = name
self.wait_for_connection()
# Spin up connections for simulation or BEC mode
self.service_cfg = None
self.scaninfo = None
self.filewriter = None
self.readout_time_min = FALCON_MIN_READOUT
self._value_pixel_per_buffer = None
self.readout_time = None
self.wait_for_connection(all_signals=True)
if not sim_mode:
from bec_lib.core.bec_service import SERVICE_CONFIG
self._update_service_config()
self.device_manager = device_manager
self._producer = self.device_manager.producer
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
else:
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
self._producer = bec_utils.MockProducer()
self.device_manager = bec_utils.MockDeviceManager()
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata()
self.service_cfg = {"base_path": base_path}
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata()
self.filewriter = FileWriterMixin(self.service_cfg)
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/"
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
self._producer = self.device_manager.producer
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriterMixin(self.service_cfg)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config"""
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self._default_parameter()