From 60b2e756550196fb5c07bb91abb4c1ae5b815c6c Mon Sep 17 00:00:00 2001 From: appel_c Date: Mon, 27 May 2024 22:17:07 +0200 Subject: [PATCH] feat: add option to save Camera data to disk, closes #66 --- .../base_classes/psi_detector_base.py | 4 +- ophyd_devices/sim/sim.py | 167 +++++++++--------- ophyd_devices/sim/sim_data.py | 19 +- ophyd_devices/sim/sim_utils.py | 38 ++++ ophyd_devices/utils/bec_scaninfo_mixin.py | 8 + 5 files changed, 140 insertions(+), 96 deletions(-) create mode 100644 ophyd_devices/sim/sim_utils.py diff --git a/ophyd_devices/interfaces/base_classes/psi_detector_base.py b/ophyd_devices/interfaces/base_classes/psi_detector_base.py index 42048f6..816be92 100644 --- a/ophyd_devices/interfaces/base_classes/psi_detector_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_detector_base.py @@ -77,7 +77,8 @@ class CustomDetectorMixin: def on_trigger(self) -> None: """ - Specify actions to be executed upon receiving trigger signal + Specify actions to be executed upon receiving trigger signal. + Return a DeviceStatus object or None """ def on_pre_scan(self) -> None: @@ -87,6 +88,7 @@ class CustomDetectorMixin: Only use if needed, and it is recommended to keep this function as short/fast as possible. """ + # TODO add configurable file_path instead of hardcoding self.parent.filepath def publish_file_location( self, done: bool = False, successful: bool = None, metadata: dict = {} ) -> None: diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index f399461..38c7c66 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -15,6 +15,10 @@ from ophyd.sim import SynSignal from ophyd.status import StatusBase from ophyd.utils import LimitError +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) from ophyd_devices.sim.sim_data import ( SimulatedDataCamera, SimulatedDataMonitor, @@ -23,6 +27,7 @@ from ophyd_devices.sim.sim_data import ( ) from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_test_devices import DummyController +from ophyd_devices.sim.sim_utils import H5Writer from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin logger = bec_logger.logger @@ -75,7 +80,8 @@ class SimMonitor(Device): ): self.precision = precision self.init_sim_params = sim_init - self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) + self.device_manager = device_manager + self.sim = self.sim_cls(parent=self, **kwargs) self._registered_proxies = {} super().__init__(name=name, parent=parent, kind=kind, **kwargs) @@ -88,7 +94,65 @@ class SimMonitor(Device): return self._registered_proxies -class SimCamera(Device): +class SimCameraSetup(CustomDetectorMixin): + """Mixin class for the SimCamera device.""" + + def on_trigger(self) -> None: + """Trigger the camera to acquire images. + + This method can be called from BEC during a scan. It will acquire images and send them to BEC. + Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config. + + Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC. + """ + try: + for _ in range(self.parent.burst.get()): + data = self.parent.image.get() + self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data) + if self.parent.stopped: + raise DeviceStop + if self.parent.write_to_disk.get(): + self.parent.h5_writer.receive_data(data) + except DeviceStop: + pass + finally: + self.parent.stopped = False + + def on_stage(self) -> None: + """Stage the camera for upcoming scan + + This method is called from BEC in preparation of a scan. + It receives metadata about the scan from BEC, + compiles it and prepares the camera for the scan. + + FYI: No data is written to disk in the simulation, but upon each trigger it + is published to the device_monitor endpoint in REDIS. + """ + self.parent.filepath = self.parent.filewriter.compile_full_filename(f"{self.parent.name}") + + self.parent.file_path.set(self.parent.filepath) + self.parent.frames.set( + self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger + ) + self.parent.exp_time.set(self.parent.scaninfo.exp_time) + self.parent.burst.set(self.parent.scaninfo.frames_per_trigger) + if self.parent.write_to_disk.get(): + self.parent.h5_writer.prepare( + file_path=self.parent.filepath, h5_entry="/entry/data/data" + ) + self.publish_file_location(done=False) + self.parent.stopped = False + + def on_unstage(self) -> None: + """Unstage the device + + Send reads from all config signals to redis + """ + if self.parent.write_to_disk.get(): + self.publish_file_location(done=True, successful=True) + + +class SimCamera(PSIDetectorBase): """A simulated device mimic any 2D camera. It's image is a computed signal, which is configurable by the user and from the command line. @@ -109,6 +173,7 @@ class SimCamera(Device): USER_ACCESS = ["sim", "registered_proxies"] + custom_prepare_cls = SimCameraSetup sim_cls = SimulatedDataCamera SHAPE = (100, 100) BIT_DEPTH = np.uint16 @@ -130,98 +195,33 @@ class SimCamera(Device): compute_readback=True, kind=Kind.omitted, ) + write_to_disk = Cpt(SetableSignal, name="write_to_disk", value=False, kind=Kind.config) def __init__( self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs ): - self.device_manager = device_manager self.init_sim_params = sim_init self._registered_proxies = {} - self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) - - super().__init__(name=name, parent=parent, kind=kind, **kwargs) - self._stopped = False - self._staged = False - self.scaninfo = None - self._update_scaninfo() + self.sim = self.sim_cls(parent=self, **kwargs) + self.h5_writer = H5Writer() + self.filepath = None + super().__init__( + name=name, parent=parent, kind=kind, device_manager=device_manager, **kwargs + ) @property def registered_proxies(self) -> None: """Dictionary of registered signal_names and proxies.""" return self._registered_proxies - def trigger(self) -> DeviceStatus: - """Trigger the camera to acquire images. - - This method can be called from BEC during a scan. It will acquire images and send them to BEC. - Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config. - - Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC. - """ + def complete(self) -> StatusBase: + """Complete the motion of the simulated device.""" status = DeviceStatus(self) - - self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False) - - def acquire(): - try: - for _ in range(self.burst.get()): - self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get()) - if self._stopped: - raise DeviceStop - except DeviceStop: - pass - finally: - self._stopped = False - self._done_acquiring() - - threading.Thread(target=acquire, daemon=True).start() + if self.write_to_disk.get(): + self.h5_writer.write_data() + status.set_finished() return status - def _update_scaninfo(self) -> None: - """Update scaninfo from BecScaninfoMixing - This depends on device manager and operation/sim_mode - """ - self.scaninfo = BecScaninfoMixin(self.device_manager) - - def stage(self) -> list[object]: - """Stage the camera for upcoming scan - - This method is called from BEC in preparation of a scan. - It receives metadata about the scan from BEC, - compiles it and prepares the camera for the scan. - - FYI: No data is written to disk in the simulation, but upon each trigger it - is published to the device_monitor endpoint in REDIS. - """ - if self._staged: - return super().stage() - self.scaninfo.load_scan_metadata() - self.file_path.set( - os.path.join( - self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number) - ) - ) - self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) - self.exp_time.set(self.scaninfo.exp_time) - self.burst.set(self.scaninfo.frames_per_trigger) - self._stopped = False - return super().stage() - - def unstage(self) -> list[object]: - """Unstage the device - - Send reads from all config signals to redis - """ - if self._stopped is True or not self._staged: - return super().unstage() - - return super().unstage() - - def stop(self, *, success=False): - """Stop the device""" - self._stopped = True - super().stop(success=success) - class SimWaveform(Device): """A simulated device mimic any 1D Waveform detector. @@ -272,7 +272,7 @@ class SimWaveform(Device): self.device_manager = device_manager self.init_sim_params = sim_init self._registered_proxies = {} - self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) + self.sim = self.sim_cls(parent=self, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs) self._stopped = False @@ -382,7 +382,7 @@ class SimPositioner(Device, PositionerBase): """ # Specify which attributes are accessible via BEC client - USER_ACCESS = ["sim", "readback", "speed", "dummy_controller", "registered_proxies"] + USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"] sim_cls = SimulatedPositioner @@ -417,8 +417,6 @@ class SimPositioner(Device, PositionerBase): kind=None, device_manager=None, sim_init: dict = None, - # TODO remove after refactoring config - speed: float = 100, **kwargs, ): self.delay = delay @@ -599,13 +597,12 @@ class SimFlyer(Device, PositionerBase, FlyerInterface): kind=None, device_manager=None, # TODO remove after refactoring config - speed: float = 100, delay: int = 1, update_frequency: int = 100, **kwargs, ): - self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) + self.sim = self.sim_cls(parent=self, **kwargs) self.precision = precision self.device_manager = device_manager self._registered_proxies = {} diff --git a/ophyd_devices/sim/sim_data.py b/ophyd_devices/sim/sim_data.py index 173b6ea..5354510 100644 --- a/ophyd_devices/sim/sim_data.py +++ b/ophyd_devices/sim/sim_data.py @@ -90,14 +90,13 @@ class SimulatedDataBase(ABC): USER_ACCESS = ["sim_params", "sim_select_model", "sim_get_models", "sim_show_all"] - def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None: + def __init__(self, *args, parent=None, **kwargs) -> None: """ Note: self._model_params duplicates parameters from _params that are solely relevant for the model used. This facilitates easier and faster access for computing the simulated state using the lmfit package. """ self.parent = parent - self.device_manager = device_manager self.sim_state = defaultdict(dict) self.registered_proxies = getattr(self.parent, "registered_proxies", {}) self._model = {} @@ -109,10 +108,10 @@ class SimulatedDataBase(ABC): Execute either the provided method or reroutes the method execution to a device proxy in case it is registered in self.parentregistered_proxies. """ - if self.registered_proxies and self.device_manager: + if self.registered_proxies and self.parent.device_manager: for proxy_name, signal in self.registered_proxies.items(): if signal == signal_name or f"{self.parent.name}_{signal}" == signal_name: - sim_proxy = self.device_manager.devices.get(proxy_name, None) + sim_proxy = self.parent.device_manager.devices.get(proxy_name, None) if sim_proxy and sim_proxy.enabled is True: method = sim_proxy.obj.lookup[self.parent.name]["method"] args = sim_proxy.obj.lookup[self.parent.name]["args"] @@ -304,9 +303,9 @@ class SimulatedPositioner(SimulatedDataBase): class SimulatedDataMonitor(SimulatedDataBase): """Simulated data class for a monitor.""" - def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None: + def __init__(self, *args, parent=None, **kwargs) -> None: self._model_lookup = self.init_lmfit_models() - super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs) + super().__init__(*args, parent=parent, **kwargs) self.bit_depth = self.parent.BIT_DEPTH self._init_default() @@ -412,8 +411,8 @@ class SimulatedDataMonitor(SimulatedDataBase): float: Value computed by the active model. """ mot_name = self.sim_params["ref_motor"] - if self.device_manager and mot_name in self.device_manager.devices: - motor_pos = self.device_manager.devices[mot_name].obj.read()[mot_name]["value"] + if self.parent.device_manager and mot_name in self.parent.device_manager.devices: + motor_pos = self.parent.device_manager.devices[mot_name].obj.read()[mot_name]["value"] else: motor_pos = 0 method = self._model @@ -502,11 +501,11 @@ class SimulatedDataWaveform(SimulatedDataMonitor): class SimulatedDataCamera(SimulatedDataBase): """Simulated class to compute data for a 2D camera.""" - def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None: + def __init__(self, *args, parent=None, **kwargs) -> None: self._model_lookup = self.init_2D_models() self._all_default_model_params = defaultdict(dict) self._init_default_camera_params() - super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs) + super().__init__(*args, parent=parent, **kwargs) self.bit_depth = self.parent.BIT_DEPTH self._init_default() diff --git a/ophyd_devices/sim/sim_utils.py b/ophyd_devices/sim/sim_utils.py new file mode 100644 index 0000000..6420ccf --- /dev/null +++ b/ophyd_devices/sim/sim_utils.py @@ -0,0 +1,38 @@ +import os +from pathlib import Path + +import h5py +import hdf5plugin + + +class H5Writer: + """Utility class to write data from device to disk""" + + def __init__(self, file_path: str = None, h5_entry: str = None): + self.file_path = file_path + self.h5_entry = h5_entry + self.h5_file = None + self.data_container = [] + + def create_dir(self): + """Create directory if it does not exist""" + file_path = str(Path(self.file_path).resolve()) + base_dir = os.path.dirname(file_path) + if not os.path.exists(base_dir): + os.makedirs(base_dir) + + def receive_data(self, data: any): + """Store data to be written to h5 file""" + self.data_container.append(data) + + def prepare(self, file_path: str, h5_entry: str): + """Prepare to write data to h5 file""" + self.data_container = [] + self.file_path = file_path + self.h5_entry = h5_entry + self.create_dir() + + def write_data(self): + """Write data to h5 file""" + with h5py.File(self.file_path, "w") as h5_file: + h5_file.create_dataset(self.h5_entry, data=self.data_container, **hdf5plugin.LZ4()) diff --git a/ophyd_devices/utils/bec_scaninfo_mixin.py b/ophyd_devices/utils/bec_scaninfo_mixin.py index 4480458..6f18ded 100644 --- a/ophyd_devices/utils/bec_scaninfo_mixin.py +++ b/ophyd_devices/utils/bec_scaninfo_mixin.py @@ -76,6 +76,14 @@ class BecScaninfoMixin: else: self.bec_info_msg = bec_info_msg + self.metadata = None + self.scan_id = None + self.scan_number = None + self.exp_time = None + self.frames_per_trigger = None + self.num_points = None + self.scan_type = None + def get_bec_info_msg(self) -> None: """Get BECInfoMsg object""" return self.bec_info_msg