diff --git a/ophyd_devices/interfaces/base_classes/psi_device_base.py b/ophyd_devices/interfaces/base_classes/psi_device_base.py index ddc528c..c34e67a 100644 --- a/ophyd_devices/interfaces/base_classes/psi_device_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_device_base.py @@ -40,10 +40,10 @@ class PSIDeviceBase(Device): super().__init__(name=name, **kwargs) self._stopped = False self.task_handler = TaskHandler(parent=self) + self.file_utils = FileHandler() if scan_info is None: scan_info = get_mock_scan_info() self.scan_info = scan_info - self.file_utils = FileHandler() self.on_init() ######################################## diff --git a/ophyd_devices/sim/sim_camera.py b/ophyd_devices/sim/sim_camera.py index 90022be..00d821d 100644 --- a/ophyd_devices/sim/sim_camera.py +++ b/ophyd_devices/sim/sim_camera.py @@ -109,7 +109,7 @@ class SimCamera(PSIDeviceBase, SimCameraControl): FYI: No data is written to disk in the simulation, but upon each trigger it is published to the device_monitor endpoint in REDIS. """ - self.file_path = self.file_utils.get_file_path( + self.file_path = self.file_utils.get_full_path( scan_status_msg=self.scan_info.msg, name=self.name ) self.frames.set( diff --git a/ophyd_devices/utils/psi_device_base_utils.py b/ophyd_devices/utils/psi_device_base_utils.py index 437a7a5..5dc42d7 100644 --- a/ophyd_devices/utils/psi_device_base_utils.py +++ b/ophyd_devices/utils/psi_device_base_utils.py @@ -7,12 +7,12 @@ import uuid from enum import Enum from typing import TYPE_CHECKING -from bec_lib.file_utils import get_full_file_path +from bec_lib.file_utils import get_full_path from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import_from -from ophyd import Device, DeviceStatus, StatusBase +from ophyd import Device, DeviceStatus -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from bec_lib.messages import ScanStatusMessage else: # TODO: put back normal import when Pydantic gets faster @@ -180,7 +180,7 @@ class TaskHandler: class FileHandler: """Utility class for file operations.""" - def get_file_path( + def get_full_path( self, scan_status_msg: ScanStatusMessage, name: str, create_dir: bool = True ) -> str: """Get the file path. @@ -190,4 +190,4 @@ class FileHandler: name: The name of the file. create_dir: Whether to create the directory. """ - return get_full_file_path(scan_status_msg=scan_status_msg, name=name, create_dir=create_dir) + return get_full_path(scan_status_msg=scan_status_msg, name=name, create_dir=create_dir) diff --git a/tests/test_simulation.py b/tests/test_simulation.py index 589701f..cf61855 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -11,7 +11,9 @@ import h5py import numpy as np import pytest from bec_lib import messages +from bec_lib.devicemanager import ScanInfo from bec_lib.endpoints import MessageEndpoints +from bec_lib.file_utils import compile_file_components from bec_server.device_server.tests.utils import DMMock from ophyd import Device, Signal from ophyd.status import wait as status_wait @@ -24,7 +26,9 @@ from ophyd_devices.interfaces.protocols.bec_protocols import ( ) from ophyd_devices.sim.sim_camera import SimCamera from ophyd_devices.sim.sim_flyer import SimFlyer -from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy, StageCameraProxy +from ophyd_devices.sim.sim_frameworks.h5_image_replay_proxy import H5ImageReplayProxy +from ophyd_devices.sim.sim_frameworks.slit_proxy import SlitProxy +from ophyd_devices.sim.sim_frameworks.stage_camera_proxy import StageCameraProxy from ophyd_devices.sim.sim_monitor import SimMonitor, SimMonitorAsync from ophyd_devices.sim.sim_positioner import SimLinearTrajectoryPositioner, SimPositioner from ophyd_devices.sim.sim_signals import ReadOnlySignal @@ -61,7 +65,7 @@ def monitor(name="monitor"): def camera(name="camera"): """Fixture for SimCamera.""" dm = DMMock() - cam = SimCamera(name=name, device_manager=dm) + cam = SimCamera(name=name, device_manager=dm, scan_info=ScanInfo) cam.filewriter = mock.MagicMock() cam.filewriter.compile_full_filename.return_value = "" yield cam @@ -408,6 +412,14 @@ def test_BECDeviceBase(): def test_h5proxy(h5proxy_fixture): """Test h5 camera proxy read from h5 file""" + msg = messages.ScanStatusMessage( + scan_id="test", + num_points=10, + scan_number=1, + status="open", + info={}, + scan_parameters={"frames_per_trigger": 1, "exp_time": 1}, + ) h5proxy, camera = h5proxy_fixture mock_proxy = mock.MagicMock() camera.device_manager.devices.update({h5proxy.name: mock_proxy}) @@ -425,10 +437,14 @@ def test_h5proxy(h5proxy_fixture): camera.sim.params = {"noise": "none", "noise_multiplier": 0} # pylint: disable=no-member camera.image_shape.set(data.shape[1:]) - camera.stage() - img = camera.image.get() - assert (img == data[0, ...]).all() - camera.unstage() + with ( + mock.patch.object(camera.file_utils, "get_full_path", return_value="/tmp/path"), + mock.patch.object(camera.scan_info, "msg", return_value=msg), + ): + camera.stage() + img = camera.image.get() + assert (img == data[0, ...]).all() + camera.unstage() def test_slitproxy(slitproxy_fixture): @@ -539,13 +555,34 @@ def test_stage_camera_proxy_image_shape( def test_cam_stage_h5writer(camera): """Test the H5Writer class""" + file_dir = None + suffix = None + msg = messages.ScanStatusMessage( + scan_id="test", + num_points=10, + scan_number=1, + status="open", + info={}, + scan_parameters={ + "frames_per_trigger": 1, + "exp_time": 1, + "system_config": {"file_directory": file_dir, "file_suffix": suffix}, + "file_components": compile_file_components( + base_path="./test", scan_nr=1, file_directory=file_dir, user_suffix=suffix + ), + }, + ) with ( mock.patch.object(camera, "h5_writer") as mock_h5_writer, mock.patch.object(camera, "_run_subs") as mock_run_subs, + mock.patch.object(camera.scan_info, "msg", return_value=msg), + mock.patch.object( + camera.file_utils, "get_full_path", return_value="./data/test_file_camera.h5" + ), ): - camera.scan_info.msg.num_points = 10 - camera.scan_info.msg.scan_parameters["frames_per_trigger"] = 1 - camera.scan_info.msg.scan_parameters["exp_time"] = 1 + # camera.scan_info.msg.num_points = 10 + # camera.scan_info.msg.scan_parameters["frames_per_trigger"] = 1 + # camera.scan_info.msg.scan_parameters["exp_time"] = 1 camera.stage() assert mock_h5_writer.on_stage.call_count == 0 camera.unstage()