refactor: cleanup

This commit is contained in:
2025-02-24 14:16:31 +01:00
parent ac4f0c5af7
commit b75207b7c0
4 changed files with 53 additions and 16 deletions

View File

@ -40,10 +40,10 @@ class PSIDeviceBase(Device):
super().__init__(name=name, **kwargs) super().__init__(name=name, **kwargs)
self._stopped = False self._stopped = False
self.task_handler = TaskHandler(parent=self) self.task_handler = TaskHandler(parent=self)
self.file_utils = FileHandler()
if scan_info is None: if scan_info is None:
scan_info = get_mock_scan_info() scan_info = get_mock_scan_info()
self.scan_info = scan_info self.scan_info = scan_info
self.file_utils = FileHandler()
self.on_init() self.on_init()
######################################## ########################################

View File

@ -109,7 +109,7 @@ class SimCamera(PSIDeviceBase, SimCameraControl):
FYI: No data is written to disk in the simulation, but upon each trigger it FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS. is published to the device_monitor endpoint in REDIS.
""" """
self.file_path = self.file_utils.get_file_path( self.file_path = self.file_utils.get_full_path(
scan_status_msg=self.scan_info.msg, name=self.name scan_status_msg=self.scan_info.msg, name=self.name
) )
self.frames.set( self.frames.set(

View File

@ -7,12 +7,12 @@ import uuid
from enum import Enum from enum import Enum
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from bec_lib.file_utils import get_full_file_path from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from from bec_lib.utils.import_utils import lazy_import_from
from ophyd import Device, DeviceStatus, StatusBase from ophyd import Device, DeviceStatus
if TYPE_CHECKING: if TYPE_CHECKING: # pragma: no cover
from bec_lib.messages import ScanStatusMessage from bec_lib.messages import ScanStatusMessage
else: else:
# TODO: put back normal import when Pydantic gets faster # TODO: put back normal import when Pydantic gets faster
@ -180,7 +180,7 @@ class TaskHandler:
class FileHandler: class FileHandler:
"""Utility class for file operations.""" """Utility class for file operations."""
def get_file_path( def get_full_path(
self, scan_status_msg: ScanStatusMessage, name: str, create_dir: bool = True self, scan_status_msg: ScanStatusMessage, name: str, create_dir: bool = True
) -> str: ) -> str:
"""Get the file path. """Get the file path.
@ -190,4 +190,4 @@ class FileHandler:
name: The name of the file. name: The name of the file.
create_dir: Whether to create the directory. create_dir: Whether to create the directory.
""" """
return get_full_file_path(scan_status_msg=scan_status_msg, name=name, create_dir=create_dir) return get_full_path(scan_status_msg=scan_status_msg, name=name, create_dir=create_dir)

View File

@ -11,7 +11,9 @@ import h5py
import numpy as np import numpy as np
import pytest import pytest
from bec_lib import messages from bec_lib import messages
from bec_lib.devicemanager import ScanInfo
from bec_lib.endpoints import MessageEndpoints from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import compile_file_components
from bec_server.device_server.tests.utils import DMMock from bec_server.device_server.tests.utils import DMMock
from ophyd import Device, Signal from ophyd import Device, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait
@ -24,7 +26,9 @@ from ophyd_devices.interfaces.protocols.bec_protocols import (
) )
from ophyd_devices.sim.sim_camera import SimCamera from ophyd_devices.sim.sim_camera import SimCamera
from ophyd_devices.sim.sim_flyer import SimFlyer from ophyd_devices.sim.sim_flyer import SimFlyer
from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy, StageCameraProxy from ophyd_devices.sim.sim_frameworks.h5_image_replay_proxy import H5ImageReplayProxy
from ophyd_devices.sim.sim_frameworks.slit_proxy import SlitProxy
from ophyd_devices.sim.sim_frameworks.stage_camera_proxy import StageCameraProxy
from ophyd_devices.sim.sim_monitor import SimMonitor, SimMonitorAsync from ophyd_devices.sim.sim_monitor import SimMonitor, SimMonitorAsync
from ophyd_devices.sim.sim_positioner import SimLinearTrajectoryPositioner, SimPositioner from ophyd_devices.sim.sim_positioner import SimLinearTrajectoryPositioner, SimPositioner
from ophyd_devices.sim.sim_signals import ReadOnlySignal from ophyd_devices.sim.sim_signals import ReadOnlySignal
@ -61,7 +65,7 @@ def monitor(name="monitor"):
def camera(name="camera"): def camera(name="camera"):
"""Fixture for SimCamera.""" """Fixture for SimCamera."""
dm = DMMock() dm = DMMock()
cam = SimCamera(name=name, device_manager=dm) cam = SimCamera(name=name, device_manager=dm, scan_info=ScanInfo)
cam.filewriter = mock.MagicMock() cam.filewriter = mock.MagicMock()
cam.filewriter.compile_full_filename.return_value = "" cam.filewriter.compile_full_filename.return_value = ""
yield cam yield cam
@ -408,6 +412,14 @@ def test_BECDeviceBase():
def test_h5proxy(h5proxy_fixture): def test_h5proxy(h5proxy_fixture):
"""Test h5 camera proxy read from h5 file""" """Test h5 camera proxy read from h5 file"""
msg = messages.ScanStatusMessage(
scan_id="test",
num_points=10,
scan_number=1,
status="open",
info={},
scan_parameters={"frames_per_trigger": 1, "exp_time": 1},
)
h5proxy, camera = h5proxy_fixture h5proxy, camera = h5proxy_fixture
mock_proxy = mock.MagicMock() mock_proxy = mock.MagicMock()
camera.device_manager.devices.update({h5proxy.name: mock_proxy}) camera.device_manager.devices.update({h5proxy.name: mock_proxy})
@ -425,10 +437,14 @@ def test_h5proxy(h5proxy_fixture):
camera.sim.params = {"noise": "none", "noise_multiplier": 0} camera.sim.params = {"noise": "none", "noise_multiplier": 0}
# pylint: disable=no-member # pylint: disable=no-member
camera.image_shape.set(data.shape[1:]) camera.image_shape.set(data.shape[1:])
camera.stage() with (
img = camera.image.get() mock.patch.object(camera.file_utils, "get_full_path", return_value="/tmp/path"),
assert (img == data[0, ...]).all() mock.patch.object(camera.scan_info, "msg", return_value=msg),
camera.unstage() ):
camera.stage()
img = camera.image.get()
assert (img == data[0, ...]).all()
camera.unstage()
def test_slitproxy(slitproxy_fixture): def test_slitproxy(slitproxy_fixture):
@ -539,13 +555,34 @@ def test_stage_camera_proxy_image_shape(
def test_cam_stage_h5writer(camera): def test_cam_stage_h5writer(camera):
"""Test the H5Writer class""" """Test the H5Writer class"""
file_dir = None
suffix = None
msg = messages.ScanStatusMessage(
scan_id="test",
num_points=10,
scan_number=1,
status="open",
info={},
scan_parameters={
"frames_per_trigger": 1,
"exp_time": 1,
"system_config": {"file_directory": file_dir, "file_suffix": suffix},
"file_components": compile_file_components(
base_path="./test", scan_nr=1, file_directory=file_dir, user_suffix=suffix
),
},
)
with ( with (
mock.patch.object(camera, "h5_writer") as mock_h5_writer, mock.patch.object(camera, "h5_writer") as mock_h5_writer,
mock.patch.object(camera, "_run_subs") as mock_run_subs, mock.patch.object(camera, "_run_subs") as mock_run_subs,
mock.patch.object(camera.scan_info, "msg", return_value=msg),
mock.patch.object(
camera.file_utils, "get_full_path", return_value="./data/test_file_camera.h5"
),
): ):
camera.scan_info.msg.num_points = 10 # camera.scan_info.msg.num_points = 10
camera.scan_info.msg.scan_parameters["frames_per_trigger"] = 1 # camera.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
camera.scan_info.msg.scan_parameters["exp_time"] = 1 # camera.scan_info.msg.scan_parameters["exp_time"] = 1
camera.stage() camera.stage()
assert mock_h5_writer.on_stage.call_count == 0 assert mock_h5_writer.on_stage.call_count == 0
camera.unstage() camera.unstage()