diff --git a/ophyd_devices/epics/devices/bec_scaninfo_mixin.py b/ophyd_devices/epics/devices/bec_scaninfo_mixin.py index 5adf6ef..188e30f 100644 --- a/ophyd_devices/epics/devices/bec_scaninfo_mixin.py +++ b/ophyd_devices/epics/devices/bec_scaninfo_mixin.py @@ -6,23 +6,70 @@ from bec_lib.core import bec_logger logger = bec_logger.logger +class Bec_Info_Msg_Mock: + def __init__( + self, + mockrid: str = "mockrid1111", + mockqueueid: str = "mockqueueID111", + scan_number: int = 1, + exp_time: float = 12e-3, + num_points: int = 500, + readout_time: float = 3e-3, + scan_type: str = "fly", + num_lines: int = 1, + frames_per_trigger: int = 1, + ) -> None: + self.mockrid = mockrid + self.mockqueueid = mockqueueid + self.scan_number = scan_number + self.exp_time = exp_time + self.num_points = num_points + self.readout_time = readout_time + self.scan_type = scan_type + self.num_lines = num_lines + self.frames_per_trigger = frames_per_trigger + + def get_bec_info_msg(self) -> dict: + info_msg = { + "RID": self.mockrid, + "queueID": self.mockqueueid, + "scan_number": self.scan_number, + "exp_time": self.exp_time, + "num_points": self.num_points, + "readout_time": self.readout_time, + "scan_type": self.scan_type, + "num_lines": self.exp_time, + "frames_per_trigger": self.frames_per_trigger, + } + + return info_msg + + class BecScaninfoMixin: - def __init__(self, device_manager: DeviceManagerBase = None, sim_mode=False) -> None: + def __init__( + self, device_manager: DeviceManagerBase = None, sim_mode: bool = False, bec_info_msg=None + ) -> None: self.device_manager = device_manager self.sim_mode = sim_mode self.scan_msg = None self.scanID = None - self.bec_info_msg = { - "RID": "mockrid", - "queueID": "mockqueuid", - "scan_number": 1, - "exp_time": 12e-3, - "num_points": 500, - "readout_time": 3e-3, - "scan_type": "fly", - "num_lines": 1, - "frames_per_trigger": 1, - } + if bec_info_msg is None: + bec_info_msg_mock = Bec_Info_Msg_Mock() + self.bec_info_msg = bec_info_msg_mock.get_bec_info_msg() + else: + self.bec_info_msg = bec_info_msg + + # self.bec_info_msg = { + # "RID": "mockrid", + # "queueID": "mockqueuid", + # "scan_number": 1, + # "exp_time": 12e-3, + # "num_points": 500, + # "readout_time": 3e-3, + # "scan_type": "fly", + # "num_lines": 1, + # "frames_per_trigger": 1, + # } def get_bec_info_msg(self) -> None: return self.bec_info_msg diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index e1768ee..fa89c94 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -143,15 +143,11 @@ class Eiger9mCsaxs(DetectorBase): if not sim_mode: self._update_service_config() self.device_manager = device_manager - self._producer = self.device_manager.producer else: - self._producer = bec_utils.MockProducer() - self.device_manager = bec_utils.MockDeviceManager() - self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) - self.scaninfo.load_scan_metadata() - base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/" - self.service_cfg = {"base_path": base_path} - + self.device_manager = bec_utils.DMMock() + base_path = f"~/Data10/" + self.service_cfg = {"base_path": os.path.expanduser(base_path)} + self._producer = self.device_manager.producer self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo.load_scan_metadata() self.filewriter = FileWriterMixin(self.service_cfg) diff --git a/ophyd_devices/utils/bec_utils.py b/ophyd_devices/utils/bec_utils.py index 304a2ed..1ac07a8 100644 --- a/ophyd_devices/utils/bec_utils.py +++ b/ophyd_devices/utils/bec_utils.py @@ -1,6 +1,8 @@ import time from bec_lib.core import bec_logger +from bec_lib.core.devicemanager import DeviceContainer +from bec_lib.core.tests.utils import ProducerMock from ophyd import Signal, Kind @@ -11,33 +13,88 @@ logger = bec_logger.logger DEFAULT_EPICSSIGNAL_VALUE = object() -class MockProducer: - def set_and_publish(self, endpoint: str, msgdump: str): - logger.info(f"BECMessage to {endpoint} with msg dump {msgdump}") - - -class MockDeviceManager: - def __init__(self) -> None: - self.devices = devices() - - -class OphydObject: - def __init__(self) -> None: - self.name = "mock_mokev" - self.obj = mokev() - - -class devices: - def __init__(self): - self.mokev = OphydObject() - - -class mokev: - def __init__(self): - self.name = "mock_mokev" +# TODO maybe specify here that this DeviceMock is for usage in the DeviceServer +class DeviceMock: + def __init__(self, name: str, value: float = 0.0): + self.name = name + self.read_buffer = value + self._config = {"deviceConfig": {"limits": [-50, 50]}, "userParameter": None} + self._enabled_set = True + self._enabled = True def read(self): - return {self.name: {"value": 16.0, "timestamp": time.time()}} + return {self.name: {"value": self.read_buffer}} + + def readback(self): + return self.read_buffer + + @property + def enabled_set(self) -> bool: + return self._enabled_set + + @enabled_set.setter + def enabled_set(self, val: bool): + self._enabled_set = val + + @property + def enabled(self) -> bool: + return self._enabled + + @enabled.setter + def enabled(self, val: bool): + self._enabled = val + + @property + def user_parameter(self): + return self._config["userParameter"] + + @property + def obj(self): + return self + + +class DMMock: + """Mock for DeviceManager + + The mocked DeviceManager creates a device containert and a producer. + + """ + + def __init__(self): + self.devices = DeviceContainer() + self.producer = ProducerMock() + + def add_device(self, name: str, value: float = 0.0): + self.devices[name] = DeviceMock(name, value) + + +# class MockProducer: +# def set_and_publish(self, endpoint: str, msgdump: str): +# logger.info(f"BECMessage to {endpoint} with msg dump {msgdump}") + + +# class MockDeviceManager: +# def __init__(self) -> None: +# self.devices = devices() + + +# class OphydObject: +# def __init__(self) -> None: +# self.name = "mock_mokev" +# self.obj = mokev() + + +# class devices: +# def __init__(self): +# self.mokev = OphydObject() + + +# class mokev: +# def __init__(self): +# self.name = "mock_mokev" + +# def read(self): +# return {self.name: {"value": 16.0, "timestamp": time.time()}} class ConfigSignal(Signal): diff --git a/tests/test_eiger9m_csaxs.py b/tests/test_eiger9m_csaxs.py index e69777d..de403f7 100644 --- a/tests/test_eiger9m_csaxs.py +++ b/tests/test_eiger9m_csaxs.py @@ -109,10 +109,52 @@ def mock_det(): @pytest.mark.parametrize( - "trigger_source, stopped, detector_state, expected_exception", + "trigger_source, stopped, detector_state, sim_mode, scan_status_msg, expected_exception", [ - (2, True, 1, False), - (2, False, 0, True), + ( + 2, + True, + 1, + True, + BECMessage.ScanStatusMessage( + scanID="1", + status={}, + info={ + "RID": "mockrid1111", + "queueID": "mockqueueID111", + "scan_number": 1, + "exp_time": 0.012, + "num_points": 500, + "readout_time": 0.003, + "scan_type": "fly", + "num_lines": 0.012, + "frames_per_trigger": 1, + }, + ), + False, + ), + ( + 2, + False, + 0, + False, + BECMessage.ScanStatusMessage( + scanID="1", + status={}, + info={ + "RID": "mockrid1111", + "queueID": "mockqueueID111", + "scan_number": 1, + "exp_time": 0.012, + "num_points": 500, + "readout_time": 0.003, + "scan_type": "fly", + "num_lines": 0.012, + "frames_per_trigger": 1, + }, + ), + True, + ), ], ) # TODO rewrite this one, write test for init_detector, init_filewriter is tested @@ -120,6 +162,8 @@ def test_init( trigger_source, stopped, detector_state, + sim_mode, + scan_status_msg, expected_exception, ): """Test the _init function: @@ -136,34 +180,54 @@ def test_init( """ name = "eiger" prefix = "X12SA-ES-EIGER9M:" - sim_mode = False + # sim_mode = sim_mode dm = DMMock() # dm.add_device("mokev", value=12.4) - with mock.patch.object(dm, "producer"): + with mock.patch.object(dm, "producer") as producer: with mock.patch( - "ophyd_devices.epics.devices.eiger9m_csaxs.BecScaninfoMixin" - ) as mixin, mock.patch( "ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin" ) as filemixin, mock.patch( "ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9mCsaxs._update_service_config" ) as mock_service_config: with mock.patch.object(Eiger9mCsaxs, "_init_filewriter") as mock_init_fw: - mock_det = Eiger9mCsaxs( - name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode - ) - mock_det.cam.detector_state.put(detector_state) - mock_det._stopped = stopped - if expected_exception: - with pytest.raises(Exception): - mock_det._init() - mock_init_fw.assert_called_once() + producer.get.return_value = scan_status_msg.dumps() + if sim_mode: + mock_det = Eiger9mCsaxs( + name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode + ) + mock_det.cam.detector_state.put(detector_state) + mock_det._stopped = stopped + if expected_exception: + with pytest.raises(Exception): + mock_det._init() + mock_init_fw.assert_called_once() + else: + mock_det._init() # call the method you want to test + assert mock_det.cam.acquire.get() == 0 + assert mock_det.cam.detector_state.get() == detector_state + assert mock_det.cam.trigger_mode.get() == trigger_source + mock_init_fw.assert_called() + assert mock_init_fw.call_count == 2 else: - mock_det._init() # call the method you want to test - assert mock_det.cam.acquire.get() == 0 - assert mock_det.cam.detector_state.get() == detector_state - assert mock_det.cam.trigger_mode.get() == trigger_source - mock_init_fw.assert_called() - assert mock_init_fw.call_count == 2 + with mock.patch( + "ophyd_devices.epics.devices.eiger9m_csaxs.BecScaninfoMixin" + ) as mixin: + mock_det = Eiger9mCsaxs( + name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode + ) + mock_det.cam.detector_state.put(detector_state) + mock_det._stopped = stopped + if expected_exception: + with pytest.raises(Exception): + mock_det._init() + mock_init_fw.assert_called_once() + else: + mock_det._init() # call the method you want to test + assert mock_det.cam.acquire.get() == 0 + assert mock_det.cam.detector_state.get() == detector_state + assert mock_det.cam.trigger_mode.get() == trigger_source + mock_init_fw.assert_called() + assert mock_init_fw.call_count == 2 @pytest.mark.parametrize(