fix: bugfix for proxy devices

This commit is contained in:
appel_c 2024-10-01 13:57:09 +02:00
parent 449dadb593
commit b1639ea3ba
3 changed files with 85 additions and 42 deletions

View File

@ -9,6 +9,23 @@ eiger:
readOnly: false
softwareTrigger: true
waveform:
readoutPriority: async
deviceClass: ophyd_devices.SimWaveform
deviceConfig:
waveform_shape: 1000
sim_init:
model: GaussianModel
params:
amplitude: 100
center: 500
sigma: 50
deviceTags:
- detector
enabled: true
readOnly: false
softwareTrigger: true
dyn_signals:
readoutPriority: baseline
deviceClass: ophyd_devices.sim.sim.SynDynamicComponents
@ -20,7 +37,7 @@ pseudo_signal1:
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def compute_signals(signal1, signal2):\n return signal1.get()*signal2.get()\n"
input_signals:
input_signals:
- "bpm4i_readback"
- "bpm5i_readback"
enabled: true
@ -40,7 +57,7 @@ eyefoc:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
delay: 1
limits:
- -50
- 50
@ -55,7 +72,7 @@ flyer_sim:
readoutPriority: on_request
deviceClass: ophyd_devices.SynFlyer
deviceConfig:
delay: 1
delay: 1
device_access: true
update_frequency: 400
deviceTags:
@ -97,4 +114,37 @@ soft_pos:
deviceTags:
- user motors
enabled: true
readOnly: false
readOnly: false
# Proxy devices
# 1. Replay image data from an HDF5 file,
# this assumes that a SimCamera with name eiger exists
hdf5_proxy:
readoutPriority: on_request
deviceClass: ophyd_devices.sim.sim_frameworks.H5ImageReplayProxy
deviceConfig:
eiger:
signal_name: image
file_source: /path/to/file.h5
h5_entry: entry
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
# 2. Simulate scanning a center position of hor/ver slits with a camera
# this assumes that a SimCamera with name eiger exists
slit_proxy:
readoutPriority: on_request
deviceClass: ophyd_devices.sim.sim_frameworks.SlitProxy
deviceConfig:
eiger:
signal_name: image
center_offset: [0, 0] # [x,y]
covariance: [[1000, 500], [200, 1000]] # [[x,x],[y,y]]
pixel_size: 0.01
ref_motors: [samx, samy]
slit_width: [1, 1]
motor_dir: [0, 1] # 0:x, 1:y in image coordinates
enabled: true
readOnly: false

View File

@ -1,6 +1,6 @@
motor1:
readoutPriority: baseline
description: 'Simulated axis 1'
description: "Simulated axis 1"
deviceClass: ophyd.sim.SynAxis
deviceConfig:
name: motor1
@ -13,7 +13,7 @@ motor1:
motor2:
readoutPriority: baseline
description: 'Simulated axis 2'
description: "Simulated axis 2"
deviceClass: ophyd.sim.SynAxis
deviceConfig:
name: motor2
@ -26,7 +26,7 @@ motor2:
det1:
readoutPriority: baseline
description: 'Simulated signal 1'
description: "Simulated signal 1"
deviceClass: ophyd.sim.SynSignal
deviceConfig:
name: det1
@ -39,7 +39,7 @@ det1:
det2:
readoutPriority: baseline
description: 'Simulated signal 2'
description: "Simulated signal 2"
deviceClass: ophyd.sim.SynSignal
deviceConfig:
name: det2
@ -52,7 +52,7 @@ det2:
per1:
readoutPriority: baseline
description: 'Simulated periodic signal 1'
description: "Simulated periodic signal 1"
deviceClass: ophyd.sim.SynPeriodicSignal
deviceConfig:
name: per1
@ -62,4 +62,3 @@ per1:
enabled: true
readOnly: false
softwareTrigger: false

View File

@ -7,11 +7,11 @@ import h5py
# pylint: disable=unused-import
import hdf5plugin # noqa: F401
import numpy as np
from ophyd import Kind, Staged
from ophyd import Component, Kind, Staged
from scipy.ndimage import gaussian_filter
from ophyd_devices.sim.sim_data import NoiseType
from ophyd_devices.sim.sim_signals import CustomSetableSignal
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils.bec_device_base import BECDeviceBase
@ -202,18 +202,27 @@ class H5ImageReplayProxy(DeviceProxy):
self._staged = Staged.no
self._image = None
self._index = 0
self._file_source = ""
self._h5_entry = ""
super().__init__(name, *args, device_manager=device_manager, **kwargs)
self.file_source = CustomSetableSignal(
name="file_source", value="", parent=self, kind=Kind.normal
)
self.h5_entry = CustomSetableSignal(
name="h5_entry", value="", parent=self, kind=Kind.normal
)
@property
def component_names(self) -> list[str]:
"""Return the names of the components."""
return ["file_source", "h5_entry"]
def file_source(self) -> str:
"""File source property."""
return self._file_source
@file_source.setter
def file_source(self, file_source: str) -> None:
self._file_source = file_source
@property
def h5_entry(self) -> str:
"""H5 entry property."""
return self._h5_entry
@h5_entry.setter
def h5_entry(self, h5_entry: str) -> None:
self._h5_entry = h5_entry
def _update_device_config(self, config: dict) -> None:
super()._update_device_config(config)
@ -226,14 +235,14 @@ class H5ImageReplayProxy(DeviceProxy):
def _init_signals(self):
"""Initialize the signals for the device."""
if "file_source" in self.config[list(self.config.keys())[0]]:
self.file_source.set(self.config[list(self.config.keys())[0]]["file_source"])
self.file_source = self.config[list(self.config.keys())[0]]["file_source"]
if "h5_entry" in self.config[list(self.config.keys())[0]]:
self.h5_entry.set(self.config[list(self.config.keys())[0]]["h5_entry"])
self.h5_entry = self.config[list(self.config.keys())[0]]["h5_entry"]
def _open_h5_file(self) -> None:
"""Opens the HDF5 file found in the file_source signal and the HDF5 dataset specified by the h5_entry signal."""
self.h5_file = h5py.File(self.file_source.get(), mode="r")
self.h5_dataset = self.h5_file[self.h5_entry.get()]
self.h5_file = h5py.File(self.file_source, mode="r")
self.h5_dataset = self.h5_file[self.h5_entry]
self._number_of_images = self.h5_dataset.shape[0]
def _close_h5_file(self) -> None:
@ -262,7 +271,7 @@ class H5ImageReplayProxy(DeviceProxy):
if self.h5_file:
self.stop()
raise FileNotFoundError(
f"Could not open h5file {self.file_source.get()} or access data set {self.h5_dataset.get()} in file"
f"Could not open h5file {self.file_source} or access data set {self.h5_dataset} in file"
) from exc
self._staged = Staged.yes
@ -290,7 +299,7 @@ class H5ImageReplayProxy(DeviceProxy):
self.unstage()
except Exception as exc:
raise FileNotFoundError(
f"Could not open h5file {self.file_source.get()} or access data set {self.h5_dataset.get()} in file"
f"Could not open h5file {self.file_source} or access data set {self.h5_dataset} in file"
) from exc
def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray:
@ -301,18 +310,3 @@ class H5ImageReplayProxy(DeviceProxy):
"""
self._load_image()
return self._image
if __name__ == "__main__":
# Example usage
tmp = H5ImageReplayProxy(name="tmp", device_manager=None)
config = {
"eiger": {
"signal_name": "image",
"file_source": "/Users/appel_c/switchdrive/Sharefolder/AgBH_2D_gridscan/projection_000006_data_000001.h5",
"h5_entry": "/entry/data/data",
}
}
tmp._update_device_config(config)
tmp.stage()
print(tmp)