mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-26 20:51:09 +02:00
refactor: generalize sim_mode
This commit is contained in:
@ -6,23 +6,70 @@ from bec_lib.core import bec_logger
|
|||||||
logger = bec_logger.logger
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
|
class Bec_Info_Msg_Mock:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
mockrid: str = "mockrid1111",
|
||||||
|
mockqueueid: str = "mockqueueID111",
|
||||||
|
scan_number: int = 1,
|
||||||
|
exp_time: float = 12e-3,
|
||||||
|
num_points: int = 500,
|
||||||
|
readout_time: float = 3e-3,
|
||||||
|
scan_type: str = "fly",
|
||||||
|
num_lines: int = 1,
|
||||||
|
frames_per_trigger: int = 1,
|
||||||
|
) -> None:
|
||||||
|
self.mockrid = mockrid
|
||||||
|
self.mockqueueid = mockqueueid
|
||||||
|
self.scan_number = scan_number
|
||||||
|
self.exp_time = exp_time
|
||||||
|
self.num_points = num_points
|
||||||
|
self.readout_time = readout_time
|
||||||
|
self.scan_type = scan_type
|
||||||
|
self.num_lines = num_lines
|
||||||
|
self.frames_per_trigger = frames_per_trigger
|
||||||
|
|
||||||
|
def get_bec_info_msg(self) -> dict:
|
||||||
|
info_msg = {
|
||||||
|
"RID": self.mockrid,
|
||||||
|
"queueID": self.mockqueueid,
|
||||||
|
"scan_number": self.scan_number,
|
||||||
|
"exp_time": self.exp_time,
|
||||||
|
"num_points": self.num_points,
|
||||||
|
"readout_time": self.readout_time,
|
||||||
|
"scan_type": self.scan_type,
|
||||||
|
"num_lines": self.exp_time,
|
||||||
|
"frames_per_trigger": self.frames_per_trigger,
|
||||||
|
}
|
||||||
|
|
||||||
|
return info_msg
|
||||||
|
|
||||||
|
|
||||||
class BecScaninfoMixin:
|
class BecScaninfoMixin:
|
||||||
def __init__(self, device_manager: DeviceManagerBase = None, sim_mode=False) -> None:
|
def __init__(
|
||||||
|
self, device_manager: DeviceManagerBase = None, sim_mode: bool = False, bec_info_msg=None
|
||||||
|
) -> None:
|
||||||
self.device_manager = device_manager
|
self.device_manager = device_manager
|
||||||
self.sim_mode = sim_mode
|
self.sim_mode = sim_mode
|
||||||
self.scan_msg = None
|
self.scan_msg = None
|
||||||
self.scanID = None
|
self.scanID = None
|
||||||
self.bec_info_msg = {
|
if bec_info_msg is None:
|
||||||
"RID": "mockrid",
|
bec_info_msg_mock = Bec_Info_Msg_Mock()
|
||||||
"queueID": "mockqueuid",
|
self.bec_info_msg = bec_info_msg_mock.get_bec_info_msg()
|
||||||
"scan_number": 1,
|
else:
|
||||||
"exp_time": 12e-3,
|
self.bec_info_msg = bec_info_msg
|
||||||
"num_points": 500,
|
|
||||||
"readout_time": 3e-3,
|
# self.bec_info_msg = {
|
||||||
"scan_type": "fly",
|
# "RID": "mockrid",
|
||||||
"num_lines": 1,
|
# "queueID": "mockqueuid",
|
||||||
"frames_per_trigger": 1,
|
# "scan_number": 1,
|
||||||
}
|
# "exp_time": 12e-3,
|
||||||
|
# "num_points": 500,
|
||||||
|
# "readout_time": 3e-3,
|
||||||
|
# "scan_type": "fly",
|
||||||
|
# "num_lines": 1,
|
||||||
|
# "frames_per_trigger": 1,
|
||||||
|
# }
|
||||||
|
|
||||||
def get_bec_info_msg(self) -> None:
|
def get_bec_info_msg(self) -> None:
|
||||||
return self.bec_info_msg
|
return self.bec_info_msg
|
||||||
|
@ -143,15 +143,11 @@ class Eiger9mCsaxs(DetectorBase):
|
|||||||
if not sim_mode:
|
if not sim_mode:
|
||||||
self._update_service_config()
|
self._update_service_config()
|
||||||
self.device_manager = device_manager
|
self.device_manager = device_manager
|
||||||
self._producer = self.device_manager.producer
|
|
||||||
else:
|
else:
|
||||||
self._producer = bec_utils.MockProducer()
|
self.device_manager = bec_utils.DMMock()
|
||||||
self.device_manager = bec_utils.MockDeviceManager()
|
base_path = f"~/Data10/"
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
|
||||||
self.scaninfo.load_scan_metadata()
|
self._producer = self.device_manager.producer
|
||||||
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
|
|
||||||
self.service_cfg = {"base_path": base_path}
|
|
||||||
|
|
||||||
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
|
||||||
self.scaninfo.load_scan_metadata()
|
self.scaninfo.load_scan_metadata()
|
||||||
self.filewriter = FileWriterMixin(self.service_cfg)
|
self.filewriter = FileWriterMixin(self.service_cfg)
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from bec_lib.core import bec_logger
|
from bec_lib.core import bec_logger
|
||||||
|
from bec_lib.core.devicemanager import DeviceContainer
|
||||||
|
from bec_lib.core.tests.utils import ProducerMock
|
||||||
|
|
||||||
from ophyd import Signal, Kind
|
from ophyd import Signal, Kind
|
||||||
|
|
||||||
@ -11,33 +13,88 @@ logger = bec_logger.logger
|
|||||||
DEFAULT_EPICSSIGNAL_VALUE = object()
|
DEFAULT_EPICSSIGNAL_VALUE = object()
|
||||||
|
|
||||||
|
|
||||||
class MockProducer:
|
# TODO maybe specify here that this DeviceMock is for usage in the DeviceServer
|
||||||
def set_and_publish(self, endpoint: str, msgdump: str):
|
class DeviceMock:
|
||||||
logger.info(f"BECMessage to {endpoint} with msg dump {msgdump}")
|
def __init__(self, name: str, value: float = 0.0):
|
||||||
|
self.name = name
|
||||||
|
self.read_buffer = value
|
||||||
class MockDeviceManager:
|
self._config = {"deviceConfig": {"limits": [-50, 50]}, "userParameter": None}
|
||||||
def __init__(self) -> None:
|
self._enabled_set = True
|
||||||
self.devices = devices()
|
self._enabled = True
|
||||||
|
|
||||||
|
|
||||||
class OphydObject:
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self.name = "mock_mokev"
|
|
||||||
self.obj = mokev()
|
|
||||||
|
|
||||||
|
|
||||||
class devices:
|
|
||||||
def __init__(self):
|
|
||||||
self.mokev = OphydObject()
|
|
||||||
|
|
||||||
|
|
||||||
class mokev:
|
|
||||||
def __init__(self):
|
|
||||||
self.name = "mock_mokev"
|
|
||||||
|
|
||||||
def read(self):
|
def read(self):
|
||||||
return {self.name: {"value": 16.0, "timestamp": time.time()}}
|
return {self.name: {"value": self.read_buffer}}
|
||||||
|
|
||||||
|
def readback(self):
|
||||||
|
return self.read_buffer
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enabled_set(self) -> bool:
|
||||||
|
return self._enabled_set
|
||||||
|
|
||||||
|
@enabled_set.setter
|
||||||
|
def enabled_set(self, val: bool):
|
||||||
|
self._enabled_set = val
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enabled(self) -> bool:
|
||||||
|
return self._enabled
|
||||||
|
|
||||||
|
@enabled.setter
|
||||||
|
def enabled(self, val: bool):
|
||||||
|
self._enabled = val
|
||||||
|
|
||||||
|
@property
|
||||||
|
def user_parameter(self):
|
||||||
|
return self._config["userParameter"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def obj(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
class DMMock:
|
||||||
|
"""Mock for DeviceManager
|
||||||
|
|
||||||
|
The mocked DeviceManager creates a device containert and a producer.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.devices = DeviceContainer()
|
||||||
|
self.producer = ProducerMock()
|
||||||
|
|
||||||
|
def add_device(self, name: str, value: float = 0.0):
|
||||||
|
self.devices[name] = DeviceMock(name, value)
|
||||||
|
|
||||||
|
|
||||||
|
# class MockProducer:
|
||||||
|
# def set_and_publish(self, endpoint: str, msgdump: str):
|
||||||
|
# logger.info(f"BECMessage to {endpoint} with msg dump {msgdump}")
|
||||||
|
|
||||||
|
|
||||||
|
# class MockDeviceManager:
|
||||||
|
# def __init__(self) -> None:
|
||||||
|
# self.devices = devices()
|
||||||
|
|
||||||
|
|
||||||
|
# class OphydObject:
|
||||||
|
# def __init__(self) -> None:
|
||||||
|
# self.name = "mock_mokev"
|
||||||
|
# self.obj = mokev()
|
||||||
|
|
||||||
|
|
||||||
|
# class devices:
|
||||||
|
# def __init__(self):
|
||||||
|
# self.mokev = OphydObject()
|
||||||
|
|
||||||
|
|
||||||
|
# class mokev:
|
||||||
|
# def __init__(self):
|
||||||
|
# self.name = "mock_mokev"
|
||||||
|
|
||||||
|
# def read(self):
|
||||||
|
# return {self.name: {"value": 16.0, "timestamp": time.time()}}
|
||||||
|
|
||||||
|
|
||||||
class ConfigSignal(Signal):
|
class ConfigSignal(Signal):
|
||||||
|
@ -109,10 +109,52 @@ def mock_det():
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"trigger_source, stopped, detector_state, expected_exception",
|
"trigger_source, stopped, detector_state, sim_mode, scan_status_msg, expected_exception",
|
||||||
[
|
[
|
||||||
(2, True, 1, False),
|
(
|
||||||
(2, False, 0, True),
|
2,
|
||||||
|
True,
|
||||||
|
1,
|
||||||
|
True,
|
||||||
|
BECMessage.ScanStatusMessage(
|
||||||
|
scanID="1",
|
||||||
|
status={},
|
||||||
|
info={
|
||||||
|
"RID": "mockrid1111",
|
||||||
|
"queueID": "mockqueueID111",
|
||||||
|
"scan_number": 1,
|
||||||
|
"exp_time": 0.012,
|
||||||
|
"num_points": 500,
|
||||||
|
"readout_time": 0.003,
|
||||||
|
"scan_type": "fly",
|
||||||
|
"num_lines": 0.012,
|
||||||
|
"frames_per_trigger": 1,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
False,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
2,
|
||||||
|
False,
|
||||||
|
0,
|
||||||
|
False,
|
||||||
|
BECMessage.ScanStatusMessage(
|
||||||
|
scanID="1",
|
||||||
|
status={},
|
||||||
|
info={
|
||||||
|
"RID": "mockrid1111",
|
||||||
|
"queueID": "mockqueueID111",
|
||||||
|
"scan_number": 1,
|
||||||
|
"exp_time": 0.012,
|
||||||
|
"num_points": 500,
|
||||||
|
"readout_time": 0.003,
|
||||||
|
"scan_type": "fly",
|
||||||
|
"num_lines": 0.012,
|
||||||
|
"frames_per_trigger": 1,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
True,
|
||||||
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
||||||
@ -120,6 +162,8 @@ def test_init(
|
|||||||
trigger_source,
|
trigger_source,
|
||||||
stopped,
|
stopped,
|
||||||
detector_state,
|
detector_state,
|
||||||
|
sim_mode,
|
||||||
|
scan_status_msg,
|
||||||
expected_exception,
|
expected_exception,
|
||||||
):
|
):
|
||||||
"""Test the _init function:
|
"""Test the _init function:
|
||||||
@ -136,34 +180,54 @@ def test_init(
|
|||||||
"""
|
"""
|
||||||
name = "eiger"
|
name = "eiger"
|
||||||
prefix = "X12SA-ES-EIGER9M:"
|
prefix = "X12SA-ES-EIGER9M:"
|
||||||
sim_mode = False
|
# sim_mode = sim_mode
|
||||||
dm = DMMock()
|
dm = DMMock()
|
||||||
# dm.add_device("mokev", value=12.4)
|
# dm.add_device("mokev", value=12.4)
|
||||||
with mock.patch.object(dm, "producer"):
|
with mock.patch.object(dm, "producer") as producer:
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.BecScaninfoMixin"
|
|
||||||
) as mixin, mock.patch(
|
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin"
|
"ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin"
|
||||||
) as filemixin, mock.patch(
|
) as filemixin, mock.patch(
|
||||||
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9mCsaxs._update_service_config"
|
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9mCsaxs._update_service_config"
|
||||||
) as mock_service_config:
|
) as mock_service_config:
|
||||||
with mock.patch.object(Eiger9mCsaxs, "_init_filewriter") as mock_init_fw:
|
with mock.patch.object(Eiger9mCsaxs, "_init_filewriter") as mock_init_fw:
|
||||||
mock_det = Eiger9mCsaxs(
|
producer.get.return_value = scan_status_msg.dumps()
|
||||||
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
|
if sim_mode:
|
||||||
)
|
mock_det = Eiger9mCsaxs(
|
||||||
mock_det.cam.detector_state.put(detector_state)
|
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
|
||||||
mock_det._stopped = stopped
|
)
|
||||||
if expected_exception:
|
mock_det.cam.detector_state.put(detector_state)
|
||||||
with pytest.raises(Exception):
|
mock_det._stopped = stopped
|
||||||
mock_det._init()
|
if expected_exception:
|
||||||
mock_init_fw.assert_called_once()
|
with pytest.raises(Exception):
|
||||||
|
mock_det._init()
|
||||||
|
mock_init_fw.assert_called_once()
|
||||||
|
else:
|
||||||
|
mock_det._init() # call the method you want to test
|
||||||
|
assert mock_det.cam.acquire.get() == 0
|
||||||
|
assert mock_det.cam.detector_state.get() == detector_state
|
||||||
|
assert mock_det.cam.trigger_mode.get() == trigger_source
|
||||||
|
mock_init_fw.assert_called()
|
||||||
|
assert mock_init_fw.call_count == 2
|
||||||
else:
|
else:
|
||||||
mock_det._init() # call the method you want to test
|
with mock.patch(
|
||||||
assert mock_det.cam.acquire.get() == 0
|
"ophyd_devices.epics.devices.eiger9m_csaxs.BecScaninfoMixin"
|
||||||
assert mock_det.cam.detector_state.get() == detector_state
|
) as mixin:
|
||||||
assert mock_det.cam.trigger_mode.get() == trigger_source
|
mock_det = Eiger9mCsaxs(
|
||||||
mock_init_fw.assert_called()
|
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
|
||||||
assert mock_init_fw.call_count == 2
|
)
|
||||||
|
mock_det.cam.detector_state.put(detector_state)
|
||||||
|
mock_det._stopped = stopped
|
||||||
|
if expected_exception:
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
mock_det._init()
|
||||||
|
mock_init_fw.assert_called_once()
|
||||||
|
else:
|
||||||
|
mock_det._init() # call the method you want to test
|
||||||
|
assert mock_det.cam.acquire.get() == 0
|
||||||
|
assert mock_det.cam.detector_state.get() == detector_state
|
||||||
|
assert mock_det.cam.trigger_mode.get() == trigger_source
|
||||||
|
mock_init_fw.assert_called()
|
||||||
|
assert mock_init_fw.call_count == 2
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
Reference in New Issue
Block a user