feat: introduce sim proxies to patch readback from devices based on sim frameworks

This commit is contained in:
2024-02-09 22:28:30 +01:00
parent 8ee5022424
commit c1664d47e7

View File

@ -0,0 +1,153 @@
import numpy as np
from scipy.ndimage import gaussian_filter
from collections import defaultdict
from ophyd_devices.sim.sim_data import NoiseType
from ophyd_devices.utils.bec_device_base import BECDeviceBase
class DeviceProxy(BECDeviceBase):
"""Device proxy class for simulated devices."""
class PinholeProxy(DeviceProxy):
"""Pinhole proxy hosts a lookup table for simulated devices.
When activated, it will create a lookup table for a simulated camera based on the config.
The lookup table will be used to simulate the effect of a pinhole on the camera image.
An example config is shown below, for the dev.eiger, with dev.samx and dev.samy as reference motors.
eiger:
cen_off: [0, 0] # [x,y]
cov: [[1000, 500], [200, 1000]] # [[x,x],[y,y]]
pixel_size: 0.01
signal: image
ref_motors: [samx, samy]
slit_width: [1, 2]
motor_dir: [0, 1] # x:0 , y:1, z:2 coordinates
"""
def __init__(
self,
name: str,
*args,
device_manager=None,
config: dict = None,
**kwargs,
):
self.connected = True
self.name = name
self.device_manager = device_manager
self.config = config
self._enabled = True
self._lookup = defaultdict(dict)
self._gaussian_blur_sigma = 8
self._compile_lookup()
@property
def lookup(self):
"""Property to get lookup table.
If a device_manager is provided, the lookup table will be returned only if the device is enabled.
If not, it will always return the lookup table.
Returns:
dict: Lookup table for the simulated camera.
"""
if self.device_manager:
return (
self._lookup
if getattr(self.device_manager.devices, self.name).enabled is True
else None
)
return self._lookup
@lookup.setter
def lookup(self, update: dict) -> None:
"""lookup setter"""
self._lookup.update(update)
def _compile_lookup(self):
"""Compile the lookup table for the simulated camera."""
for device_name in self.config.keys():
self.lookup[device_name] = {
"obj": self,
"method": self._compute,
"args": (device_name,),
"kwargs": {},
}
def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray:
"""
Compute the lookup table for the simulated camera.
It copies the sim_camera bevahiour and adds a mask to simulate the effect of a pinhole.
Args:
device_name (str): Name of the device.
Returns:
np.ndarray: Lookup table for the simulated camera.
"""
device_obj = self.device_manager.devices.get(device_name)
params = device_obj.sim._all_params.get("gauss")
shape = device_obj.image_shape.get()
params.update(
{
"noise": NoiseType.POISSON,
"cov": np.array(self.config[device_name]["cov"]),
"cen_off": np.array(self.config[device_name]["cen_off"]),
}
)
pos, offset, cov, amp = device_obj.sim._prepare_params_gauss(params, shape)
v = device_obj.sim._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp)
device_pos = self.config[device_name]["pixel_size"] * pos
valid_mask = self._create_mask(
device_pos=device_pos,
ref_motors=self.config[device_name]["ref_motors"],
width=self.config[device_name]["slit_width"],
dir=self.config[device_name]["motor_dir"],
)
valid_mask = self._blur_image(valid_mask, sigma=self._gaussian_blur_sigma)
v *= valid_mask
v = device_obj.sim._add_noise(v, params["noise"])
v = device_obj.sim._add_hot_pixel(v, params["hot_pixel"])
return v
def _blur_image(self, image: np.ndarray, sigma: float = 5) -> np.ndarray:
"""Blur the image with a gaussian filter.
Args:
image (np.ndarray): Image to be blurred.
sigma (float): Sigma for the gaussian filter.
Returns:
np.ndarray: Blurred image.
"""
return gaussian_filter(image, sigma=sigma)
def _create_mask(
self, device_pos: np.ndarray, ref_motors: list[str], width: list[float], dir: list[int]
):
"""Create the mask to simulate the effect of a pinhole.
Args:
device_pos (np.ndarray) : XY position array of the detector pixel
ref_motors (list[str]) : List of reference motors
width (list[float]) : width of the slit, in the same units as the motors
dir (list[int]) : direction of the motors, 0 for x and 1 for y
Returns:
np.ndarray: Mask array
"""
mask = np.ones_like(device_pos, dtype=bool)
for ii, motor_name in enumerate(ref_motors):
motor_pos = self.device_manager.devices.get(motor_name).read()[motor_name]["value"]
edges = [motor_pos + width[ii] / 2, motor_pos - width[ii] / 2]
mask[..., dir[ii]] = np.logical_and(
device_pos[..., dir[ii]] > np.min(edges), device_pos[..., dir[ii]] < np.max(edges)
)
return np.prod(mask, axis=2)