feat: add option to save Camera data to disk, closes #66

This commit is contained in:
appel_c 2024-05-27 22:17:07 +02:00
parent 9c93916108
commit 60b2e75655
5 changed files with 140 additions and 96 deletions

View File

@ -77,7 +77,8 @@ class CustomDetectorMixin:
def on_trigger(self) -> None:
"""
Specify actions to be executed upon receiving trigger signal
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
def on_pre_scan(self) -> None:
@ -87,6 +88,7 @@ class CustomDetectorMixin:
Only use if needed, and it is recommended to keep this function as short/fast as possible.
"""
# TODO add configurable file_path instead of hardcoding self.parent.filepath
def publish_file_location(
self, done: bool = False, successful: bool = None, metadata: dict = {}
) -> None:

View File

@ -15,6 +15,10 @@ from ophyd.sim import SynSignal
from ophyd.status import StatusBase
from ophyd.utils import LimitError
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_data import (
SimulatedDataCamera,
SimulatedDataMonitor,
@ -23,6 +27,7 @@ from ophyd_devices.sim.sim_data import (
)
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.sim.sim_test_devices import DummyController
from ophyd_devices.sim.sim_utils import H5Writer
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
@ -75,7 +80,8 @@ class SimMonitor(Device):
):
self.precision = precision
self.init_sim_params = sim_init
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
self.device_manager = device_manager
self.sim = self.sim_cls(parent=self, **kwargs)
self._registered_proxies = {}
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
@ -88,7 +94,65 @@ class SimMonitor(Device):
return self._registered_proxies
class SimCamera(Device):
class SimCameraSetup(CustomDetectorMixin):
"""Mixin class for the SimCamera device."""
def on_trigger(self) -> None:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
try:
for _ in range(self.parent.burst.get()):
data = self.parent.image.get()
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data)
if self.parent.stopped:
raise DeviceStop
if self.parent.write_to_disk.get():
self.parent.h5_writer.receive_data(data)
except DeviceStop:
pass
finally:
self.parent.stopped = False
def on_stage(self) -> None:
"""Stage the camera for upcoming scan
This method is called from BEC in preparation of a scan.
It receives metadata about the scan from BEC,
compiles it and prepares the camera for the scan.
FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS.
"""
self.parent.filepath = self.parent.filewriter.compile_full_filename(f"{self.parent.name}")
self.parent.file_path.set(self.parent.filepath)
self.parent.frames.set(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
self.parent.exp_time.set(self.parent.scaninfo.exp_time)
self.parent.burst.set(self.parent.scaninfo.frames_per_trigger)
if self.parent.write_to_disk.get():
self.parent.h5_writer.prepare(
file_path=self.parent.filepath, h5_entry="/entry/data/data"
)
self.publish_file_location(done=False)
self.parent.stopped = False
def on_unstage(self) -> None:
"""Unstage the device
Send reads from all config signals to redis
"""
if self.parent.write_to_disk.get():
self.publish_file_location(done=True, successful=True)
class SimCamera(PSIDetectorBase):
"""A simulated device mimic any 2D camera.
It's image is a computed signal, which is configurable by the user and from the command line.
@ -109,6 +173,7 @@ class SimCamera(Device):
USER_ACCESS = ["sim", "registered_proxies"]
custom_prepare_cls = SimCameraSetup
sim_cls = SimulatedDataCamera
SHAPE = (100, 100)
BIT_DEPTH = np.uint16
@ -130,98 +195,33 @@ class SimCamera(Device):
compute_readback=True,
kind=Kind.omitted,
)
write_to_disk = Cpt(SetableSignal, name="write_to_disk", value=False, kind=Kind.config)
def __init__(
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
):
self.device_manager = device_manager
self.init_sim_params = sim_init
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False
self._staged = False
self.scaninfo = None
self._update_scaninfo()
self.sim = self.sim_cls(parent=self, **kwargs)
self.h5_writer = H5Writer()
self.filepath = None
super().__init__(
name=name, parent=parent, kind=kind, device_manager=device_manager, **kwargs
)
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
def trigger(self) -> DeviceStatus:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
def complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
status = DeviceStatus(self)
self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False)
def acquire():
try:
for _ in range(self.burst.get()):
self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get())
if self._stopped:
raise DeviceStop
except DeviceStop:
pass
finally:
self._stopped = False
self._done_acquiring()
threading.Thread(target=acquire, daemon=True).start()
if self.write_to_disk.get():
self.h5_writer.write_data()
status.set_finished()
return status
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager)
def stage(self) -> list[object]:
"""Stage the camera for upcoming scan
This method is called from BEC in preparation of a scan.
It receives metadata about the scan from BEC,
compiles it and prepares the camera for the scan.
FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS.
"""
if self._staged:
return super().stage()
self.scaninfo.load_scan_metadata()
self.file_path.set(
os.path.join(
self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number)
)
)
self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
self.exp_time.set(self.scaninfo.exp_time)
self.burst.set(self.scaninfo.frames_per_trigger)
self._stopped = False
return super().stage()
def unstage(self) -> list[object]:
"""Unstage the device
Send reads from all config signals to redis
"""
if self._stopped is True or not self._staged:
return super().unstage()
return super().unstage()
def stop(self, *, success=False):
"""Stop the device"""
self._stopped = True
super().stop(success=success)
class SimWaveform(Device):
"""A simulated device mimic any 1D Waveform detector.
@ -272,7 +272,7 @@ class SimWaveform(Device):
self.device_manager = device_manager
self.init_sim_params = sim_init
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False
@ -382,7 +382,7 @@ class SimPositioner(Device, PositionerBase):
"""
# Specify which attributes are accessible via BEC client
USER_ACCESS = ["sim", "readback", "speed", "dummy_controller", "registered_proxies"]
USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"]
sim_cls = SimulatedPositioner
@ -417,8 +417,6 @@ class SimPositioner(Device, PositionerBase):
kind=None,
device_manager=None,
sim_init: dict = None,
# TODO remove after refactoring config
speed: float = 100,
**kwargs,
):
self.delay = delay
@ -599,13 +597,12 @@ class SimFlyer(Device, PositionerBase, FlyerInterface):
kind=None,
device_manager=None,
# TODO remove after refactoring config
speed: float = 100,
delay: int = 1,
update_frequency: int = 100,
**kwargs,
):
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
self.sim = self.sim_cls(parent=self, **kwargs)
self.precision = precision
self.device_manager = device_manager
self._registered_proxies = {}

View File

@ -90,14 +90,13 @@ class SimulatedDataBase(ABC):
USER_ACCESS = ["sim_params", "sim_select_model", "sim_get_models", "sim_show_all"]
def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None:
def __init__(self, *args, parent=None, **kwargs) -> None:
"""
Note:
self._model_params duplicates parameters from _params that are solely relevant for the model used.
This facilitates easier and faster access for computing the simulated state using the lmfit package.
"""
self.parent = parent
self.device_manager = device_manager
self.sim_state = defaultdict(dict)
self.registered_proxies = getattr(self.parent, "registered_proxies", {})
self._model = {}
@ -109,10 +108,10 @@ class SimulatedDataBase(ABC):
Execute either the provided method or reroutes the method execution
to a device proxy in case it is registered in self.parentregistered_proxies.
"""
if self.registered_proxies and self.device_manager:
if self.registered_proxies and self.parent.device_manager:
for proxy_name, signal in self.registered_proxies.items():
if signal == signal_name or f"{self.parent.name}_{signal}" == signal_name:
sim_proxy = self.device_manager.devices.get(proxy_name, None)
sim_proxy = self.parent.device_manager.devices.get(proxy_name, None)
if sim_proxy and sim_proxy.enabled is True:
method = sim_proxy.obj.lookup[self.parent.name]["method"]
args = sim_proxy.obj.lookup[self.parent.name]["args"]
@ -304,9 +303,9 @@ class SimulatedPositioner(SimulatedDataBase):
class SimulatedDataMonitor(SimulatedDataBase):
"""Simulated data class for a monitor."""
def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None:
def __init__(self, *args, parent=None, **kwargs) -> None:
self._model_lookup = self.init_lmfit_models()
super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(*args, parent=parent, **kwargs)
self.bit_depth = self.parent.BIT_DEPTH
self._init_default()
@ -412,8 +411,8 @@ class SimulatedDataMonitor(SimulatedDataBase):
float: Value computed by the active model.
"""
mot_name = self.sim_params["ref_motor"]
if self.device_manager and mot_name in self.device_manager.devices:
motor_pos = self.device_manager.devices[mot_name].obj.read()[mot_name]["value"]
if self.parent.device_manager and mot_name in self.parent.device_manager.devices:
motor_pos = self.parent.device_manager.devices[mot_name].obj.read()[mot_name]["value"]
else:
motor_pos = 0
method = self._model
@ -502,11 +501,11 @@ class SimulatedDataWaveform(SimulatedDataMonitor):
class SimulatedDataCamera(SimulatedDataBase):
"""Simulated class to compute data for a 2D camera."""
def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None:
def __init__(self, *args, parent=None, **kwargs) -> None:
self._model_lookup = self.init_2D_models()
self._all_default_model_params = defaultdict(dict)
self._init_default_camera_params()
super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(*args, parent=parent, **kwargs)
self.bit_depth = self.parent.BIT_DEPTH
self._init_default()

View File

@ -0,0 +1,38 @@
import os
from pathlib import Path
import h5py
import hdf5plugin
class H5Writer:
"""Utility class to write data from device to disk"""
def __init__(self, file_path: str = None, h5_entry: str = None):
self.file_path = file_path
self.h5_entry = h5_entry
self.h5_file = None
self.data_container = []
def create_dir(self):
"""Create directory if it does not exist"""
file_path = str(Path(self.file_path).resolve())
base_dir = os.path.dirname(file_path)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
def receive_data(self, data: any):
"""Store data to be written to h5 file"""
self.data_container.append(data)
def prepare(self, file_path: str, h5_entry: str):
"""Prepare to write data to h5 file"""
self.data_container = []
self.file_path = file_path
self.h5_entry = h5_entry
self.create_dir()
def write_data(self):
"""Write data to h5 file"""
with h5py.File(self.file_path, "w") as h5_file:
h5_file.create_dataset(self.h5_entry, data=self.data_container, **hdf5plugin.LZ4())

View File

@ -76,6 +76,14 @@ class BecScaninfoMixin:
else:
self.bec_info_msg = bec_info_msg
self.metadata = None
self.scan_id = None
self.scan_number = None
self.exp_time = None
self.frames_per_trigger = None
self.num_points = None
self.scan_type = None
def get_bec_info_msg(self) -> None:
"""Get BECInfoMsg object"""
return self.bec_info_msg