mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-24 11:41:09 +02:00
refactor: add documentation, clean up init function and unify classes
This commit is contained in:
@ -83,6 +83,7 @@ class Eiger9mCsaxs(DetectorBase):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Specify which functions are revealed to the user in BEC client
|
||||||
USER_ACCESS = [
|
USER_ACCESS = [
|
||||||
"describe",
|
"describe",
|
||||||
]
|
]
|
||||||
@ -102,6 +103,18 @@ class Eiger9mCsaxs(DetectorBase):
|
|||||||
sim_mode=False,
|
sim_mode=False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
|
"""Initialize the Eiger9M detector
|
||||||
|
Args:
|
||||||
|
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
|
||||||
|
prefix (str): PV prefix (X12SA-ES-EIGER9M:)
|
||||||
|
name (str): 'eiger'
|
||||||
|
kind (str):
|
||||||
|
read_attrs (list):
|
||||||
|
configuration_attrs (list):
|
||||||
|
parent (object):
|
||||||
|
device_manager (object): BEC device manager
|
||||||
|
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
|
||||||
|
"""
|
||||||
super().__init__(
|
super().__init__(
|
||||||
prefix=prefix,
|
prefix=prefix,
|
||||||
name=name,
|
name=name,
|
||||||
@ -111,13 +124,16 @@ class Eiger9mCsaxs(DetectorBase):
|
|||||||
parent=parent,
|
parent=parent,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
self._stopped = False
|
|
||||||
self._lock = threading.RLock()
|
|
||||||
if device_manager is None and not sim_mode:
|
if device_manager is None and not sim_mode:
|
||||||
raise EigerError("Add DeviceManager to initialization or init with sim_mode=True")
|
raise EigerError("Add DeviceManager to initialization or init with sim_mode=True")
|
||||||
|
|
||||||
|
# Not sure if this is needed, comment it for now!
|
||||||
|
#self._lock = threading.RLock()
|
||||||
|
self._stopped = False
|
||||||
self.name = name
|
self.name = name
|
||||||
self.wait_for_connection() # Make sure to be connected before talking to PVs
|
self.wait_for_connection()
|
||||||
|
# Spin up connections for simulation or BEC mode
|
||||||
|
# TODO check if sim_mode still works. Is it needed? I believe filewriting might be handled properly
|
||||||
if not sim_mode:
|
if not sim_mode:
|
||||||
from bec_lib.core.bec_service import SERVICE_CONFIG
|
from bec_lib.core.bec_service import SERVICE_CONFIG
|
||||||
|
|
||||||
@ -125,17 +141,17 @@ class Eiger9mCsaxs(DetectorBase):
|
|||||||
self._producer = self.device_manager.producer
|
self._producer = self.device_manager.producer
|
||||||
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
||||||
else:
|
else:
|
||||||
|
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
|
||||||
self._producer = bec_utils.MockProducer()
|
self._producer = bec_utils.MockProducer()
|
||||||
self.device_manager = bec_utils.MockDeviceManager()
|
self.device_manager = bec_utils.MockDeviceManager()
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
||||||
self.scaninfo.load_scan_metadata()
|
self.scaninfo.load_scan_metadata()
|
||||||
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"}
|
self.service_cfg = {"base_path": base_path}
|
||||||
|
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
||||||
self.scaninfo.load_scan_metadata()
|
self.scaninfo.load_scan_metadata()
|
||||||
# TODO
|
|
||||||
self.filepath = ""
|
|
||||||
|
|
||||||
self.filewriter = FileWriterMixin(self.service_cfg)
|
self.filewriter = FileWriterMixin(self.service_cfg)
|
||||||
|
|
||||||
self.reduce_readout = 1e-3 # 3 ms
|
self.reduce_readout = 1e-3 # 3 ms
|
||||||
self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered
|
self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered
|
||||||
self._init_eiger9m()
|
self._init_eiger9m()
|
||||||
|
@ -77,29 +77,40 @@ class FalconHDF5Plugins(Device):
|
|||||||
|
|
||||||
|
|
||||||
class FalconCsaxs(Device):
|
class FalconCsaxs(Device):
|
||||||
"""FalxonX1 with HDF5 writer"""
|
"""Falcon Sitoro detector for CSAXS
|
||||||
|
|
||||||
|
Parent class: Device
|
||||||
|
Device classes: EpicsDXPFalcon dxp1:, EpicsMCARecord mca1, FalconHDF5Plugins HDF1:
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
name str: 'falcon'
|
||||||
|
prefix (str): PV prefix ("X12SA-SITORO:)
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Specify which functions are revealed to the user in BEC client
|
||||||
|
USER_ACCESS = [
|
||||||
|
"describe",
|
||||||
|
]
|
||||||
|
|
||||||
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
|
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
|
||||||
mca = Cpt(EpicsMCARecord, "mca1")
|
mca = Cpt(EpicsMCARecord, "mca1")
|
||||||
hdf5 = Cpt(FalconHDF5Plugins, "HDF1:")
|
hdf5 = Cpt(FalconHDF5Plugins, "HDF1:")
|
||||||
|
|
||||||
# Control
|
# specify Epics PVs for Falcon
|
||||||
|
# TODO consider moving this outside of this class!
|
||||||
stop_all = Cpt(EpicsSignal, "StopAll")
|
stop_all = Cpt(EpicsSignal, "StopAll")
|
||||||
erase_all = Cpt(EpicsSignal, "EraseAll")
|
erase_all = Cpt(EpicsSignal, "EraseAll")
|
||||||
start_all = Cpt(EpicsSignal, "StartAll")
|
start_all = Cpt(EpicsSignal, "StartAll")
|
||||||
state = Cpt(EpicsSignal, "Acquiring")
|
state = Cpt(EpicsSignal, "Acquiring")
|
||||||
# Preset options
|
|
||||||
preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers
|
preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers
|
||||||
preset_real = Cpt(EpicsSignal, "PresetReal")
|
preset_real = Cpt(EpicsSignal, "PresetReal")
|
||||||
preset_events = Cpt(EpicsSignal, "PresetEvents")
|
preset_events = Cpt(EpicsSignal, "PresetEvents")
|
||||||
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
|
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
|
||||||
# read-only diagnostics
|
|
||||||
triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True)
|
triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True)
|
||||||
events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True)
|
events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True)
|
||||||
input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True)
|
input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True)
|
||||||
output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True)
|
output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True)
|
||||||
|
|
||||||
# Mapping control
|
|
||||||
collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping
|
collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping
|
||||||
pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode")
|
pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode")
|
||||||
ignore_gate = Cpt(EpicsSignal, "IgnoreGate")
|
ignore_gate = Cpt(EpicsSignal, "IgnoreGate")
|
||||||
@ -109,8 +120,6 @@ class FalconCsaxs(Device):
|
|||||||
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
|
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
|
||||||
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
|
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
|
||||||
|
|
||||||
# HDF5
|
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
prefix="",
|
prefix="",
|
||||||
@ -124,6 +133,18 @@ class FalconCsaxs(Device):
|
|||||||
sim_mode=False,
|
sim_mode=False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
|
"""Initialize Falcon detector
|
||||||
|
Args:
|
||||||
|
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
|
||||||
|
prefix (str): PV prefix ("X12SA-SITORO:)
|
||||||
|
name (str): 'falcon'
|
||||||
|
kind (str):
|
||||||
|
read_attrs (list):
|
||||||
|
configuration_attrs (list):
|
||||||
|
parent (object):
|
||||||
|
device_manager (object): BEC device manager
|
||||||
|
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
|
||||||
|
"""
|
||||||
super().__init__(
|
super().__init__(
|
||||||
prefix=prefix,
|
prefix=prefix,
|
||||||
name=name,
|
name=name,
|
||||||
@ -137,7 +158,8 @@ class FalconCsaxs(Device):
|
|||||||
raise FalconError("Add DeviceManager to initialization or init with sim_mode=True")
|
raise FalconError("Add DeviceManager to initialization or init with sim_mode=True")
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
self.name = name
|
self.name = name
|
||||||
self.wait_for_connection() # Make sure to be connected before talking to PVs
|
self.wait_for_connection()
|
||||||
|
# Spin up connections for simulation or BEC mode
|
||||||
if not sim_mode:
|
if not sim_mode:
|
||||||
from bec_lib.core.bec_service import SERVICE_CONFIG
|
from bec_lib.core.bec_service import SERVICE_CONFIG
|
||||||
|
|
||||||
@ -145,15 +167,21 @@ class FalconCsaxs(Device):
|
|||||||
self._producer = self.device_manager.producer
|
self._producer = self.device_manager.producer
|
||||||
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
||||||
else:
|
else:
|
||||||
|
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
|
||||||
self._producer = bec_utils.MockProducer()
|
self._producer = bec_utils.MockProducer()
|
||||||
self.device_manager = bec_utils.MockDeviceManager()
|
self.device_manager = bec_utils.MockDeviceManager()
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
||||||
self.scaninfo.load_scan_metadata()
|
self.scaninfo.load_scan_metadata()
|
||||||
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"}
|
self.service_cfg = {"base_path": base_path}
|
||||||
|
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
||||||
self.scaninfo.load_scan_metadata()
|
self.scaninfo.load_scan_metadata()
|
||||||
self.filewriter = FileWriterMixin(self.service_cfg)
|
self.filewriter = FileWriterMixin(self.service_cfg)
|
||||||
|
self._init_detector()
|
||||||
|
|
||||||
|
def _init_detector(self) -> None:
|
||||||
|
"""Set up detector parameters, init detector, init filewriter
|
||||||
|
"""
|
||||||
self.readout = 0.003 # 3 ms
|
self.readout = 0.003 # 3 ms
|
||||||
self._value_pixel_per_buffer = 20 # 16
|
self._value_pixel_per_buffer = 20 # 16
|
||||||
self._clean_up()
|
self._clean_up()
|
||||||
@ -162,6 +190,7 @@ class FalconCsaxs(Device):
|
|||||||
|
|
||||||
def _clean_up(self) -> None:
|
def _clean_up(self) -> None:
|
||||||
"""Clean up"""
|
"""Clean up"""
|
||||||
|
#TODO clarify when to use put and when to use set!
|
||||||
self.hdf5.capture.put(0)
|
self.hdf5.capture.put(0)
|
||||||
self.stop_all.put(1)
|
self.stop_all.put(1)
|
||||||
self.erase_all.put(1)
|
self.erase_all.put(1)
|
||||||
|
@ -78,6 +78,11 @@ class PilatusCsaxs(DetectorBase):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Specify which functions are revealed to the user in BEC client
|
||||||
|
USER_ACCESS = [
|
||||||
|
"describe",
|
||||||
|
]
|
||||||
|
|
||||||
cam = ADCpt(SlsDetectorCam, "cam1:")
|
cam = ADCpt(SlsDetectorCam, "cam1:")
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -93,6 +98,18 @@ class PilatusCsaxs(DetectorBase):
|
|||||||
sim_mode=False,
|
sim_mode=False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
|
"""Initialize the Pilatus detector
|
||||||
|
Args:
|
||||||
|
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
|
||||||
|
prefix (str): PV prefix ("X12SA-ES-PILATUS300K:)
|
||||||
|
name (str): 'pilatus_2'
|
||||||
|
kind (str):
|
||||||
|
read_attrs (list):
|
||||||
|
configuration_attrs (list):
|
||||||
|
parent (object):
|
||||||
|
device_manager (object): BEC device manager
|
||||||
|
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
|
||||||
|
"""
|
||||||
super().__init__(
|
super().__init__(
|
||||||
prefix=prefix,
|
prefix=prefix,
|
||||||
name=name,
|
name=name,
|
||||||
@ -106,7 +123,8 @@ class PilatusCsaxs(DetectorBase):
|
|||||||
raise PilatusError("Add DeviceManager to initialization or init with sim_mode=True")
|
raise PilatusError("Add DeviceManager to initialization or init with sim_mode=True")
|
||||||
|
|
||||||
self.name = name
|
self.name = name
|
||||||
self.wait_for_connection() # Make sure to be connected before talking to PVs
|
self.wait_for_connection()
|
||||||
|
# Spin up connections for simulation or BEC mode
|
||||||
if not sim_mode:
|
if not sim_mode:
|
||||||
from bec_lib.core.bec_service import SERVICE_CONFIG
|
from bec_lib.core.bec_service import SERVICE_CONFIG
|
||||||
|
|
||||||
@ -114,16 +132,17 @@ class PilatusCsaxs(DetectorBase):
|
|||||||
self._producer = self.device_manager.producer
|
self._producer = self.device_manager.producer
|
||||||
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
||||||
else:
|
else:
|
||||||
|
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
|
||||||
self._producer = bec_utils.MockProducer()
|
self._producer = bec_utils.MockProducer()
|
||||||
self.device_manager = bec_utils.MockDeviceManager()
|
self.device_manager = bec_utils.MockDeviceManager()
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
||||||
self.scaninfo.load_scan_metadata()
|
self.scaninfo.load_scan_metadata()
|
||||||
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"}
|
self.service_cfg = {"base_path": base_path}
|
||||||
|
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
||||||
self.filepath_h5 = ""
|
self.scaninfo.load_scan_metadata()
|
||||||
|
|
||||||
self.filewriter = FileWriterMixin(self.service_cfg)
|
self.filewriter = FileWriterMixin(self.service_cfg)
|
||||||
|
|
||||||
self.readout = 1e-3 # 3 ms
|
self.readout = 1e-3 # 3 ms
|
||||||
|
|
||||||
# TODO maybe needed
|
# TODO maybe needed
|
||||||
|
Reference in New Issue
Block a user