mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-01-17 15:49:21 +01:00
refactor: fixed tests and mocks for refactor init
This commit is contained in:
@@ -16,6 +16,7 @@ from std_daq_client import StdDaqClient
|
|||||||
from bec_lib.core import BECMessage, MessageEndpoints, threadlocked
|
from bec_lib.core import BECMessage, MessageEndpoints, threadlocked
|
||||||
from bec_lib.core.file_utils import FileWriterMixin
|
from bec_lib.core.file_utils import FileWriterMixin
|
||||||
from bec_lib.core import bec_logger
|
from bec_lib.core import bec_logger
|
||||||
|
from bec_lib.core.bec_service import SERVICE_CONFIG
|
||||||
|
|
||||||
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
|
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
|
||||||
from ophyd_devices.utils import bec_utils
|
from ophyd_devices.utils import bec_utils
|
||||||
@@ -132,30 +133,41 @@ class Eiger9mCsaxs(DetectorBase):
|
|||||||
)
|
)
|
||||||
if device_manager is None and not sim_mode:
|
if device_manager is None and not sim_mode:
|
||||||
raise EigerError("Add DeviceManager to initialization or init with sim_mode=True")
|
raise EigerError("Add DeviceManager to initialization or init with sim_mode=True")
|
||||||
|
self.sim_mode = sim_mode
|
||||||
# TODO check if threadlock is needed for unstage
|
# TODO check if threadlock is needed for unstage
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
self.name = name
|
self.name = name
|
||||||
self.service_cfg = None
|
self.service_cfg = None
|
||||||
self.std_client = None
|
self.std_client = None
|
||||||
|
self.scaninfo = None
|
||||||
|
self.filewriter = None
|
||||||
self.wait_for_connection(all_signals=True)
|
self.wait_for_connection(all_signals=True)
|
||||||
if not sim_mode:
|
if not sim_mode:
|
||||||
self._update_service_config()
|
self._update_service_config()
|
||||||
self.device_manager = device_manager
|
self.device_manager = device_manager
|
||||||
else:
|
else:
|
||||||
self.device_manager = bec_utils.DMMock()
|
self.device_manager = bec_utils.DMMock()
|
||||||
base_path = f"~/Data10/"
|
base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/"
|
||||||
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
|
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
|
||||||
self._producer = self.device_manager.producer
|
self._producer = self.device_manager.producer
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self._update_scaninfo()
|
||||||
self.scaninfo.load_scan_metadata()
|
self._update_filewriter()
|
||||||
self.filewriter = FileWriterMixin(self.service_cfg)
|
|
||||||
self._init()
|
self._init()
|
||||||
|
|
||||||
def _update_service_config(self) -> None:
|
def _update_filewriter(self) -> None:
|
||||||
from bec_lib.core.bec_service import SERVICE_CONFIG
|
"""Update filewriter with service config"""
|
||||||
|
self.filewriter = FileWriterMixin(self.service_cfg)
|
||||||
|
|
||||||
|
def _update_scaninfo(self) -> None:
|
||||||
|
"""Update scaninfo from BecScaninfoMixing
|
||||||
|
This depends on device manager and operation/sim_mode
|
||||||
|
"""
|
||||||
|
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
|
||||||
|
self.scaninfo.load_scan_metadata()
|
||||||
|
|
||||||
|
def _update_service_config(self) -> None:
|
||||||
|
"""Update service config from BEC service config"""
|
||||||
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
||||||
|
|
||||||
# TODO function for abstract class?
|
# TODO function for abstract class?
|
||||||
|
|||||||
@@ -97,23 +97,20 @@ def mock_det():
|
|||||||
dm = DMMock()
|
dm = DMMock()
|
||||||
# dm.add_device("mokev", value=12.4)
|
# dm.add_device("mokev", value=12.4)
|
||||||
with mock.patch.object(dm, "producer"):
|
with mock.patch.object(dm, "producer"):
|
||||||
with mock.patch(
|
with mock.patch.object(
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.BecScaninfoMixin"
|
Eiger9mCsaxs, "_update_service_config"
|
||||||
) as mixin, mock.patch(
|
) as mock_update_service_config, mock.patch(
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin"
|
"ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin"
|
||||||
) as filemixin, mock.patch(
|
) as filemixin:
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9mCsaxs._update_service_config"
|
|
||||||
) as mock_service_config:
|
|
||||||
with mock.patch.object(Eiger9mCsaxs, "_init"):
|
with mock.patch.object(Eiger9mCsaxs, "_init"):
|
||||||
yield Eiger9mCsaxs(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
|
yield Eiger9mCsaxs(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"trigger_source, stopped, detector_state, sim_mode, scan_status_msg, expected_exception",
|
"trigger_source, detector_state, sim_mode, scan_status_msg, expected_exception",
|
||||||
[
|
[
|
||||||
(
|
(
|
||||||
2,
|
2,
|
||||||
True,
|
|
||||||
1,
|
1,
|
||||||
True,
|
True,
|
||||||
BECMessage.ScanStatusMessage(
|
BECMessage.ScanStatusMessage(
|
||||||
@@ -131,36 +128,34 @@ def mock_det():
|
|||||||
"frames_per_trigger": 1,
|
"frames_per_trigger": 1,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
True,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
2,
|
||||||
|
0,
|
||||||
|
False,
|
||||||
|
BECMessage.ScanStatusMessage(
|
||||||
|
scanID="1",
|
||||||
|
status={},
|
||||||
|
info={
|
||||||
|
"RID": "mockrid1111",
|
||||||
|
"queueID": "mockqueueID111",
|
||||||
|
"scan_number": 1,
|
||||||
|
"exp_time": 0.012,
|
||||||
|
"num_points": 500,
|
||||||
|
"readout_time": 0.003,
|
||||||
|
"scan_type": "fly",
|
||||||
|
"num_lines": 0.012,
|
||||||
|
"frames_per_trigger": 1,
|
||||||
|
},
|
||||||
|
),
|
||||||
False,
|
False,
|
||||||
),
|
),
|
||||||
# (
|
|
||||||
# 2,
|
|
||||||
# False,
|
|
||||||
# 0,
|
|
||||||
# False,
|
|
||||||
# BECMessage.ScanStatusMessage(
|
|
||||||
# scanID="1",
|
|
||||||
# status={},
|
|
||||||
# info={
|
|
||||||
# "RID": "mockrid1111",
|
|
||||||
# "queueID": "mockqueueID111",
|
|
||||||
# "scan_number": 1,
|
|
||||||
# "exp_time": 0.012,
|
|
||||||
# "num_points": 500,
|
|
||||||
# "readout_time": 0.003,
|
|
||||||
# "scan_type": "fly",
|
|
||||||
# "num_lines": 0.012,
|
|
||||||
# "frames_per_trigger": 1,
|
|
||||||
# },
|
|
||||||
# ),
|
|
||||||
# True,
|
|
||||||
# ),
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
||||||
def test_init(
|
def test_init(
|
||||||
trigger_source,
|
trigger_source,
|
||||||
stopped,
|
|
||||||
detector_state,
|
detector_state,
|
||||||
sim_mode,
|
sim_mode,
|
||||||
scan_status_msg,
|
scan_status_msg,
|
||||||
@@ -180,54 +175,34 @@ def test_init(
|
|||||||
"""
|
"""
|
||||||
name = "eiger"
|
name = "eiger"
|
||||||
prefix = "X12SA-ES-EIGER9M:"
|
prefix = "X12SA-ES-EIGER9M:"
|
||||||
# sim_mode = sim_mode
|
sim_mode = sim_mode
|
||||||
dm = DMMock()
|
dm = DMMock()
|
||||||
# dm.add_device("mokev", value=12.4)
|
with mock.patch.object(dm, "producer") as producer, mock.patch.object(
|
||||||
with mock.patch.object(dm, "producer") as producer:
|
Eiger9mCsaxs, "_init_filewriter"
|
||||||
with mock.patch(
|
) as mock_init_fw, mock.patch.object(
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin"
|
Eiger9mCsaxs, "_update_scaninfo"
|
||||||
) as filemixin, mock.patch(
|
) as mock_update_scaninfo, mock.patch.object(
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9mCsaxs._update_service_config"
|
Eiger9mCsaxs, "_update_filewriter"
|
||||||
) as mock_service_config:
|
) as mock_update_filewriter, mock.patch.object(
|
||||||
with mock.patch.object(Eiger9mCsaxs, "_init_filewriter") as mock_init_fw:
|
Eiger9mCsaxs, "_update_service_config"
|
||||||
producer.get.return_value = scan_status_msg.dumps()
|
) as mock_update_service_config:
|
||||||
if sim_mode:
|
mock_det = Eiger9mCsaxs(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
|
||||||
mock_det = Eiger9mCsaxs(
|
mock_det.cam.detector_state.put(detector_state)
|
||||||
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
|
if expected_exception:
|
||||||
)
|
with pytest.raises(Exception):
|
||||||
mock_det.cam.detector_state.put(detector_state)
|
mock_det._init()
|
||||||
mock_det._stopped = stopped
|
mock_init_fw.assert_called_once()
|
||||||
if expected_exception:
|
else:
|
||||||
with pytest.raises(Exception):
|
mock_det._init() # call the method you want to test
|
||||||
mock_det._init()
|
assert mock_det.cam.acquire.get() == 0
|
||||||
mock_init_fw.assert_called_once()
|
assert mock_det.cam.detector_state.get() == detector_state
|
||||||
else:
|
assert mock_det.cam.trigger_mode.get() == trigger_source
|
||||||
mock_det._init() # call the method you want to test
|
mock_init_fw.assert_called()
|
||||||
assert mock_det.cam.acquire.get() == 0
|
mock_update_scaninfo.assert_called_once()
|
||||||
assert mock_det.cam.detector_state.get() == detector_state
|
mock_update_filewriter.assert_called_once()
|
||||||
assert mock_det.cam.trigger_mode.get() == trigger_source
|
mock_update_service_config.assert_called_once()
|
||||||
mock_init_fw.assert_called()
|
|
||||||
assert mock_init_fw.call_count == 2
|
assert mock_init_fw.call_count == 2
|
||||||
else:
|
|
||||||
with mock.patch(
|
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.BecScaninfoMixin"
|
|
||||||
) as mixin:
|
|
||||||
mock_det = Eiger9mCsaxs(
|
|
||||||
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
|
|
||||||
)
|
|
||||||
mock_det.cam.detector_state.put(detector_state)
|
|
||||||
mock_det._stopped = stopped
|
|
||||||
if expected_exception:
|
|
||||||
with pytest.raises(Exception):
|
|
||||||
mock_det._init()
|
|
||||||
mock_init_fw.assert_called_once()
|
|
||||||
else:
|
|
||||||
mock_det._init() # call the method you want to test
|
|
||||||
assert mock_det.cam.acquire.get() == 0
|
|
||||||
assert mock_det.cam.detector_state.get() == detector_state
|
|
||||||
assert mock_det.cam.trigger_mode.get() == trigger_source
|
|
||||||
mock_init_fw.assert_called()
|
|
||||||
assert mock_init_fw.call_count == 2
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
@@ -425,7 +400,9 @@ def test_prep_file_writer(mock_det, scaninfo, daq_status, expected_exception):
|
|||||||
mock_det, "_filepath_exists"
|
mock_det, "_filepath_exists"
|
||||||
) as mock_file_path_exists, mock.patch.object(
|
) as mock_file_path_exists, mock.patch.object(
|
||||||
mock_det, "_stop_file_writer"
|
mock_det, "_stop_file_writer"
|
||||||
) as mock_stop_file_writer:
|
) as mock_stop_file_writer, mock.patch.object(
|
||||||
|
mock_det, "scaninfo"
|
||||||
|
) as mock_scaninfo:
|
||||||
# mock_det = eiger_factory(name, prefix, sim_mode)
|
# mock_det = eiger_factory(name, prefix, sim_mode)
|
||||||
mock_det.std_client = mock_std_daq
|
mock_det.std_client = mock_std_daq
|
||||||
mock_std_daq.start_writer_async.return_value = None
|
mock_std_daq.start_writer_async.return_value = None
|
||||||
|
|||||||
Reference in New Issue
Block a user