feat(simulation): add stage camera proxy

This commit is contained in:
2025-01-23 14:39:00 +01:00
parent 8fd17c53d7
commit 1c6cacd550
9 changed files with 234 additions and 4 deletions

View File

@ -148,3 +148,18 @@ slit_proxy:
motor_dir: [0, 1] # 0:x, 1:y in image coordinates
enabled: true
readOnly: false
# 2. Simulate scanning a camera with two positioners.
# this config assumes that a SimCamera with name oav exists
# and positioners with names samx and samy
camera_stages_proxy:
readoutPriority: on_request
deviceClass: ophyd_devices.sim.sim_frameworks.StageCameraProxy
deviceConfig:
oav:
signal_name: image
ref_motors: [samx, samy]
file_source: "" # optional: there is a default image which will be used
roi_fraction: 0.25 # optional: there is a default value which will be used
enabled: true
readOnly: false

View File

@ -1,5 +1,6 @@
from ophyd_devices.sim.sim_frameworks.device_proxy import DeviceProxy
from ophyd_devices.sim.sim_frameworks.h5_image_replay_proxy import H5ImageReplayProxy
from ophyd_devices.sim.sim_frameworks.slit_proxy import SlitProxy
from ophyd_devices.sim.sim_frameworks.stage_camera_proxy import StageCameraProxy
__all__ = ["DeviceProxy", "H5ImageReplayProxy", "SlitProxy"]
__all__ = ["DeviceProxy", "H5ImageReplayProxy", "SlitProxy", "StageCameraProxy"]

View File

@ -0,0 +1,7 @@
#!/bin/python
import base64
import sys
with open(sys.argv[1], mode="rb") as f:
with open(sys.argv[2], "w") as o:
o.write(str(base64.b64encode(f.read())))

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 166 KiB

View File

@ -0,0 +1,155 @@
import base64
import io
import os
from typing import TYPE_CHECKING, SupportsFloat
import numpy as np
from bec_lib import bec_logger
from ophyd import PositionerBase, Staged
from ophyd_devices.sim.sim_camera import SimCamera
from ophyd_devices.sim.sim_frameworks.assets.default_image import DEFAULT_IMAGE
from ophyd_devices.sim.sim_frameworks.device_proxy import DeviceProxy
try:
from PIL import Image
except ImportError as e:
raise Exception(
"PIL/pillow is not available - please install ophyd_devices with dev dependencies to enable this simulation"
) from e
logger = bec_logger.logger
class StageCameraProxy(DeviceProxy):
"""This Proxy class scans an ROI over an image based on some positioners, as if
a stage was being moved in front of some camera. The sample config expects positioners
samx and samy to exist in the device manager."""
def __init__(self, name, *args, device_manager=None, **kwargs):
self._file_source = (
"" # absolute path to an image file to use, or by default use the file from ./assets
)
self._staged = Staged.no
self._roi_fraction: float = 0.15
self._image: Image.Image | None = None
self._shape: tuple[int, int] | None = None
self._name = name
self._x_roi_fraction: float
self._y_roi_fraction: float
self._image_size: tuple[int, int]
self._motors: tuple[PositionerBase, PositionerBase]
super().__init__(name, *args, device_manager=device_manager, **kwargs)
def _validate_motors_from_config(self):
ref_motors: tuple[list[str], ...] = self.config[self._device_name]["ref_motors"]
logger.debug(f"using reference_motors {ref_motors} for camera view simulation")
ref_motor_0 = self.device_manager.devices.get(ref_motors[0]).obj
ref_motor_1 = self.device_manager.devices.get(ref_motors[1]).obj
if ref_motor_0 is None:
raise ValueError(f"{self._name}: device {ref_motor_0} doesn't exist in device manager")
elif ref_motor_1 is None:
raise ValueError(f"{self._name}: device {ref_motor_1} doesn't exist in device manager")
else:
self._motors: tuple[PositionerBase, PositionerBase] = (ref_motor_0, ref_motor_1)
def _update_device_config(self, config: dict) -> None:
super()._update_device_config(config)
if len(self.config.keys()) > 1:
raise RuntimeError(
f"The current implementation of device {self.name} can only replay data for a single device. The config has information about multiple devices {config.keys()}"
)
logger.debug(f"{self._name} received config: {self.config}")
self._device_name = list(self.config.keys())[0]
roi_fraction = self.config[self._device_name].get("roi_fraction")
if roi_fraction is not None:
logger.debug(f"Updating roi_fraction on {self._name} to {roi_fraction}")
if not isinstance(roi_fraction, SupportsFloat):
raise ValueError('"roi_fraction" must be a number!')
self._roi_fraction = roi_fraction
self._validate_motors_from_config()
def stop(self) -> None:
"""Stop the device."""
...
def stage(self) -> list[object]:
"""Stage the device; loads the image from file."""
self._device: SimCamera = self.device_manager.devices.get(self._device_name).obj
self._shape = self._device.image_shape.get()
shape_aspect_ratio = self._shape[0] / self._shape[1]
if self._staged != Staged.no:
return [self]
try:
self._load_image()
except Exception as exc:
raise type(e)(
f"{self._name}: Could not open image file {self._file_source}, relative to {os.getcwd()}"
) from exc
w, h = self._image.size
self._x_roi_fraction = self._roi_fraction
self._y_roi_fraction = h / w * self._roi_fraction / shape_aspect_ratio
self._staged = Staged.yes
return [self]
def unstage(self) -> list[object]:
"""Unstage the device"""
self._image = None
self._staged = Staged.no
return [self]
def _load_image(self):
"""Try loading the image from the filesystem"""
try:
if self._file_source == "":
logger.debug(f"{self._name} is using the default image")
self._image = Image.open(io.BytesIO((base64.b64decode(DEFAULT_IMAGE))))
else:
self._image = Image.open(self.file_source)
self._image.load()
except Exception as e:
raise type(e)(
f"Make sure you have set the image path in the device config for {self._name}: - currently it is '{self._file_source}'"
) from e
def _compute(self, *args, **kwargs):
"""Compute the image.
Returns:
np.ndarray: Image.
"""
logger.debug("{self._name}: compute called.")
if self._staged == Staged.no:
logger.debug("")
return np.zeros((*self._shape, 3))
elif self._image is None:
raise ValueError(
f"{self._name}: Something went wrong - expected an image to have been loaded"
)
def get_positioner_fraction_along_limits(positioner: PositionerBase):
if (limits := positioner.limits) == [0, 0] or limits[0] == limits[1]:
raise ValueError(
f"Device {positioner} must have limits set to be used as an axis for the camera view simulation"
)
return (positioner.position - limits[0]) / (limits[1] - limits[0])
x, y = (get_positioner_fraction_along_limits(m) for m in self._motors)
w, h = self._image.size
# x increases rightwards from the image origin
cropped_x_min_px = x * (1 - self._x_roi_fraction) * w
cropped_x_max_px = (x * (1 - self._x_roi_fraction) + self._x_roi_fraction) * w
# y increases downard from the image origin
cropped_y_min_px = h - (y * (1 - self._y_roi_fraction) * h)
cropped_y_max_px = h - ((y * (1 - self._y_roi_fraction) + self._y_roi_fraction) * h)
cropped_image = self._image.crop(
(cropped_x_min_px, cropped_y_max_px, cropped_x_max_px, cropped_y_min_px)
)
scaled_image = cropped_image.resize(self._shape)
return np.array(scaled_image)

View File

@ -30,6 +30,7 @@ dependencies = [
[project.optional-dependencies]
dev = [
"bec-server~=3.0",
"pillow ~= 11.1",
"black~=24.0",
"isort~=5.13, >=5.13.2",
"coverage~=7.0",

View File

@ -25,7 +25,7 @@ from ophyd_devices.interfaces.protocols.bec_protocols import (
)
from ophyd_devices.sim.sim_camera import SimCamera
from ophyd_devices.sim.sim_flyer import SimFlyer
from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy
from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy, StageCameraProxy
from ophyd_devices.sim.sim_monitor import SimMonitor, SimMonitorAsync
from ophyd_devices.sim.sim_positioner import SimLinearTrajectoryPositioner, SimPositioner
from ophyd_devices.sim.sim_signals import ReadOnlySignal
@ -92,7 +92,7 @@ def async_monitor(name="async_monitor"):
@pytest.fixture(scope="function")
def h5proxy_fixture(camera, name="h5proxy"):
def h5proxy_fixture(camera: SimCamera, name="h5proxy"):
"""Fixture for SimCamera."""
dm = camera.device_manager
proxy = H5ImageReplayProxy(name=name, device_manager=dm)
@ -108,6 +108,26 @@ def slitproxy_fixture(camera, name="slit_proxy"):
yield proxy, camera, samx
@pytest.fixture(scope="function")
def stage_camera_proxy_fixture(camera, name="stage_camera_proxy"):
"""Fixture for SimCamera."""
dm = camera.device_manager
proxy = StageCameraProxy(name=name, device_manager=dm)
samx = SimPositioner(name="samx", limits=[-50, 50], device_manager=dm)
samy = SimPositioner(name="samy", limits=[-50, 50], device_manager=dm)
for device in (camera, proxy, samx, samy):
device_mock = mock.MagicMock()
device_mock.obj = device
device_mock.enabled = True
dm.devices[device.name] = device_mock
proxy._update_device_config(
{camera.name: {"signal_name": "image", "ref_motors": [samx.name, samy.name]}}
)
camera._registered_proxies.update({proxy.name: camera.image.name})
proxy.enabled = True
yield proxy, camera, samx, samy
@pytest.fixture(scope="function")
def flyer(name="flyer"):
"""Fixture for SimFlyer."""
@ -368,7 +388,7 @@ def test_BECDeviceBase():
assert isinstance(device, BECDevice)
def test_h5proxy(h5proxy_fixture, camera):
def test_h5proxy(h5proxy_fixture):
"""Test h5 camera proxy read from h5 file"""
h5proxy, camera = h5proxy_fixture
mock_proxy = mock.MagicMock()
@ -470,6 +490,36 @@ def test_proxy_config_and_props_stay_in_sync(h5proxy_fixture: tuple[H5ImageRepla
assert h5proxy.config[cam.name]["h5_entry"] == h5proxy.h5_entry
def test_stage_camera_proxy_image_moves_with_samx_and_samy(
stage_camera_proxy_fixture: tuple[StageCameraProxy, SimCamera, SimPositioner, SimPositioner]
):
"""Test camera stage proxy to compute readback from readback of positioner samx and samy"""
proxy, camera, samx, samy = stage_camera_proxy_fixture
proxy.stage()
image_at_0: np.ndarray = camera.image.get()
image_at_0_again: np.ndarray = camera.image.get()
assert np.array_equal(image_at_0, image_at_0_again)
samx.move(-10).wait()
image_at_x_10 = camera.image.get()
assert not np.array_equal(image_at_0, image_at_x_10)
samy.move(-10).wait()
image_at_x_10_y_10 = camera.image.get()
assert not np.array_equal(image_at_x_10, image_at_x_10_y_10)
def test_stage_camera_proxy_image_shape(
stage_camera_proxy_fixture: tuple[StageCameraProxy, SimCamera, SimPositioner, SimPositioner]
):
"""Make sure that the produced image has the same shape as the detector being proxied"""
proxy, camera, samx, samy = stage_camera_proxy_fixture
test_shape = (102, 77)
camera.image_shape.set(test_shape).wait()
proxy.stage()
image = camera.image.get()
assert image.shape == (*reversed(test_shape), 3)
def test_cam_stage_h5writer(camera):
"""Test the H5Writer class"""
with (