refactor: Refactored SimCamera write_to_disk option to continously write to h5 file.

This commit is contained in:
appel_c 2024-10-24 09:27:06 +02:00
parent b4dc99f9c7
commit 41c54aa851
4 changed files with 78 additions and 25 deletions

View File

@ -101,7 +101,15 @@ class CustomDetectorMixin:
This can for instance be to check with the detector and backend if all data is written succsessfully.
"""
def publish_file_location(self, done: bool, successful: bool, metadata: dict = None) -> None:
# TODO make this a SUB event in the device manager
def publish_file_location(
self,
done: bool,
successful: bool,
filepath: str = None,
hinted_locations: dict = None,
metadata: dict = None,
) -> None:
"""
Publish the filepath to REDIS.
@ -112,13 +120,19 @@ class CustomDetectorMixin:
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
filepath (str): Optional, filepath to publish. If None, it will be taken from self.parent.filepath.get()
hinted_locations (dict): Optional, dictionary with hinted locations; {dev_name : h5_entry}
metadata (dict): additional metadata to publish
"""
if metadata is None:
metadata = {}
if filepath is None:
file_path = self.parent.filepath.get()
msg = messages.FileMessage(
file_path=self.parent.filepath.get(),
hinted_locations=hinted_locations,
done=done,
successful=successful,
metadata=metadata,

View File

@ -79,7 +79,7 @@ class SimCameraSetup(CustomDetectorMixin):
self.parent.exp_time.set(self.parent.scaninfo.exp_time)
self.parent.burst.set(self.parent.scaninfo.frames_per_trigger)
if self.parent.write_to_disk.get():
self.parent.h5_writer.prepare(
self.parent.h5_writer.on_stage(
file_path=self.parent.filepath.get(), h5_entry="/entry/data/data"
)
self.publish_file_location(done=False, successful=False)
@ -92,7 +92,7 @@ class SimCameraSetup(CustomDetectorMixin):
def on_complete_call(status: DeviceStatus) -> None:
try:
if self.parent.write_to_disk.get():
self.parent.h5_writer.write_data()
self.parent.h5_writer.on_complete()
self.publish_file_location(done=True, successful=True)
if self.parent.stopped:
raise DeviceStopError(f"{self.parent.name} was stopped")
@ -109,12 +109,18 @@ class SimCameraSetup(CustomDetectorMixin):
self._thread_complete.start()
return status
def on_unstage(self):
"""Unstage the camera device."""
if self.parent.write_to_disk.get():
self.parent.h5_writer.on_unstage()
def on_stop(self) -> None:
"""Stop the camera acquisition."""
if self._thread_trigger:
self._thread_trigger.join()
if self._thread_complete:
self._thread_complete.join()
self.on_unstage()
self._thread_trigger = None
self._thread_complete = None

View File

@ -15,6 +15,7 @@ class H5Writer:
self.file_path = file_path
self.h5_entry = h5_entry
self.h5_file = None
self.file_handle = None
self.data_container = []
def create_dir(self):
@ -27,18 +28,45 @@ class H5Writer:
def receive_data(self, data: any):
"""Store data to be written to h5 file"""
self.data_container.append(data)
if len(self.data_container) > 2:
self.write_data()
def prepare(self, file_path: str, h5_entry: str):
def on_stage(self, file_path: str, h5_entry: str):
"""Prepare to write data to h5 file"""
self.data_container = []
self.data_container.clear()
self.file_path = file_path
self.h5_entry = h5_entry
self.create_dir()
# Create file and truncate if it exists
with h5py.File(self.file_path, "w") as f:
pass
def on_complete(self):
"""Write data to h5 file"""
if len(self.data_container) > 0:
self.write_data()
def on_unstage(self):
"""Close file handle"""
def write_data(self):
"""Write data to h5 file"""
with h5py.File(self.file_path, "w") as h5_file:
h5_file.create_dataset(self.h5_entry, data=self.data_container, **hdf5plugin.LZ4())
"""Write data to h5 file. If the scan is started, the file will be truncated first"""
with h5py.File(self.file_path, "a") as f:
dataset = self.h5_entry
value = self.data_container
if isinstance(value, list):
shape = (
value[0].shape if hasattr(value[0], "shape") else (len(value), len(value[0]))
)
shape = (None, *shape)
if dataset not in f:
f.create_dataset(
dataset, data=np.array(value), maxshape=shape, chunks=True, **hdf5plugin.LZ4()
)
else:
f[dataset].resize((f[dataset].shape[0] + len(value)), axis=0)
f[dataset][-len(value) :] = np.array(value)
self.data_container.clear()
class LinearTrajectory:

View File

@ -464,12 +464,12 @@ def test_cam_stage_h5writer(camera):
camera.scaninfo.frames_per_trigger = 1
camera.scaninfo.exp_time = 1
camera.stage()
assert mock_h5_writer.prepare.call_count == 0
assert mock_h5_writer.on_stage.call_count == 0
camera.unstage()
camera.write_to_disk.put(True)
camera.stage()
calls = [mock.call(file_path="", h5_entry="/entry/data/data")]
assert mock_h5_writer.prepare.mock_calls == calls
assert mock_h5_writer.on_stage.mock_calls == calls
# mock_h5_writer.prepare
@ -480,11 +480,11 @@ def test_cam_complete(camera):
status_wait(status)
assert status.done is True
assert status.success is True
assert mock_h5_writer.write_data.call_count == 0
assert mock_h5_writer.on_complete.call_count == 0
camera.write_to_disk.put(True)
status = camera.complete()
status_wait(status)
assert mock_h5_writer.write_data.call_count == 1
assert mock_h5_writer.on_complete.call_count == 1
def test_cam_trigger(camera):
@ -505,23 +505,28 @@ def test_cam_trigger(camera):
assert mock_h5_writer.receive_data.call_count == 2
def test_h5writer():
def test_h5writer(tmp_path):
"""Test the H5Writer class"""
h5_writer = H5Writer()
with mock.patch.object(h5_writer, "create_dir") as mock_create_dir:
h5_writer.data_container = [0, 1, 2]
h5_writer.prepare(file_path="test.h5", h5_entry="entry/data/data")
assert mock_create_dir.call_count == 1
assert h5_writer.data_container == []
assert h5_writer.file_path == "test.h5"
assert h5_writer.h5_entry == "entry/data/data"
h5_writer.data_container = [np.array([0, 1, 2, 3, 4])]
fp = tmp_path / "test.h5"
h5_writer.on_stage(file_path=fp, h5_entry="entry/data/data")
assert h5_writer.data_container == []
assert h5_writer.file_path == fp
assert h5_writer.h5_entry == "entry/data/data"
data = [0, 1, 2, 3]
h5_writer.receive_data(data)
assert h5_writer.data_container == [data]
h5_writer.receive_data(0)
assert h5_writer.data_container == [data, 0]
data = np.array([0, 1])
h5_writer.receive_data(data)
assert h5_writer.data_container == [data]
new_data = np.array([3, 4])
h5_writer.receive_data(new_data)
assert h5_writer.data_container == [data, new_data]
h5_writer.receive_data(new_data)
assert h5_writer.data_container == []
h5_writer.receive_data(new_data)
h5_writer.on_complete()
assert h5_writer.data_container == []
def test_async_monitor_stage(async_monitor):