refactor: cleanup file_utils compile_file_components and get_full_path

This commit is contained in:
appel_c 2025-01-22 18:02:04 +01:00
parent 9b57f6b067
commit 55ae253f16
3 changed files with 83 additions and 21 deletions

View File

@ -119,21 +119,39 @@ class FileWriterError(Exception):
"""Exception for errors in the file writer"""
def compile_file_path_for_scan_status(
def compile_file_components(
base_path: str,
scan_nr: int,
scan_bundle: int = 1000,
leading_zeros: int = 5,
file_directory: str = None,
) -> str:
"""Compile the File Path for ScanStatusMessage without suffix and file type extension."""
user_suffix: str = None,
) -> tuple[str, str]:
"""Compile the File Path for ScanStatusMessage without suffix and file type extension.
Args:
base_path (str): Base path
scan_nr (int): Scan number
scan_bundle (int, optional): Scan bundle size. Defaults to 1000.
leading_zeros (int, optional): Number of leading zeros. Defaults to 5.
file_directory (str, optional): File directory. Defaults to None.
Returns:
tuple(str, str): Tuple with file path components (file_path, extension), i.e. ('/data/S00000-00999/S00001', 'h5')
"""
file_extension = "h5"
if file_directory is None:
file_directory = FileWriter.get_scan_directory(
scan_number=scan_nr, scan_bundle=scan_bundle, leading_zeros=leading_zeros
scan_number=scan_nr,
scan_bundle=scan_bundle,
leading_zeros=leading_zeros,
user_suffix=user_suffix,
)
file_path = os.path.join(base_path, "data", file_directory, f"S{scan_nr:0{leading_zeros}d}")
return file_path
file_path_component = os.path.join(
base_path, "data", file_directory, f"S{scan_nr:0{leading_zeros}d}"
)
return (file_path_component, file_extension)
def get_full_path(scan_status_msg: ScanStatusMessage, name: str, create_dir: bool = True) -> str:
@ -152,15 +170,18 @@ def get_full_path(scan_status_msg: ScanStatusMessage, name: str, create_dir: boo
raise FileWriterError(
f"Can't use suffix {name}; formatting is alphanumeric:{name.isalnum()} and ascii {name.isascii()}"
)
file_path = scan_status_msg.info.get("file_path", None)
if not file_path:
file_components = scan_status_msg.info.get("file_components", None)
if not file_components:
raise FileWriterError("No file path available in scan status message.")
file_base_path, file_extension = file_components[0], file_components[1]
if not file_components:
raise FileWriterError("No file path available in scan status message.")
# Add name and user_suffix to the file path
user_suffix = scan_status_msg.scan_parameters["system_config"].get("file_suffix", None)
if user_suffix:
name += f"_{user_suffix}"
# Compile full file path
full_path = f"{file_path}_{name}.h5"
full_path = f"{file_base_path}_{name}.{file_extension}"
if create_dir:
os.makedirs(os.path.dirname(full_path), exist_ok=True)
return full_path

View File

@ -8,7 +8,7 @@ from typing import TYPE_CHECKING
from bec_lib import messages
from bec_lib.alarm_handler import Alarms
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import compile_file_path_for_scan_status
from bec_lib.file_utils import compile_file_components
from bec_lib.logger import bec_logger
from .errors import ScanAbortion
@ -194,7 +194,7 @@ class ScanWorker(threading.Thread):
"num_points": num_points,
"scan_parameters": active_rb.scan.scan_parameters,
"request_inputs": active_rb.scan.request_inputs,
"file_path": compile_file_path_for_scan_status(
"file_components": compile_file_components(
# pylint: disable=protected-access
base_path=self.parent._service_config.service_config["file_writer"][
"base_path"
@ -203,6 +203,7 @@ class ScanWorker(threading.Thread):
file_directory=active_rb.scan.scan_parameters["system_config"][
"file_directory"
],
user_suffix=active_rb.scan.scan_parameters["system_config"]["file_suffix"],
),
}
)

View File

@ -1,4 +1,5 @@
# pylint: skip-file
import os
import uuid
from unittest import mock
@ -228,17 +229,48 @@ def test_open_scan(scan_worker_mock, instr, corr_num_points, scan_id):
"RID": "something",
"system_config": {"file_suffix": None, "file_directory": None},
},
)
),
messages.ScanQueueMessage(
scan_type="grid_scan",
parameter={
"args": {"samx": (-5, 5, 5), "samy": (-1, 1, 2)},
"kwargs": {
"exp_time": 1,
"relative": True,
"system_config": {"file_suffix": "test", "file_directory": "tmp"},
},
"num_points": 10,
},
queue="primary",
metadata={
"RID": "something",
"system_config": {"file_suffix": "test", "file_directory": "tmp"},
},
),
messages.ScanQueueMessage(
scan_type="grid_scan",
parameter={
"args": {"samx": (-5, 5, 5), "samy": (-1, 1, 2)},
"kwargs": {
"exp_time": 1,
"relative": True,
"system_config": {"file_suffix": "test", "file_directory": None},
},
"num_points": 10,
},
queue="primary",
metadata={
"RID": "something",
"system_config": {"file_suffix": "test", "file_directory": None},
},
),
],
)
def test_initialize_scan_info(scan_worker_mock, msg):
worker = scan_worker_mock
scan_server = scan_worker_mock.parent
rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server))
assert rb.metadata == {
"RID": "something",
"system_config": {"file_suffix": None, "file_directory": None},
}
assert rb.metadata == msg.metadata
with mock.patch.object(worker, "current_instruction_queue_item"):
worker.scan_motors = ["samx"]
@ -261,14 +293,22 @@ def test_initialize_scan_info(scan_worker_mock, msg):
assert worker.current_scan_info["monitor_sync"] == "bec"
assert worker.current_scan_info["frames_per_trigger"] == 1
assert worker.current_scan_info["args"] == {"samx": (-5, 5, 5), "samy": (-1, 1, 2)}
assert worker.current_scan_info["kwargs"] == {
"exp_time": 1,
"relative": True,
"system_config": {"file_suffix": None, "file_directory": None},
}
assert worker.current_scan_info["kwargs"] == msg.parameter["kwargs"]
assert "samx" in worker.current_scan_info["readout_priority"]["monitored"]
assert "samy" in worker.current_scan_info["readout_priority"]["baseline"]
base_path = worker.parent._service_config.service_config["file_writer"]["base_path"]
scan_nr = worker.current_scan_info["scan_number"]
file_dir = msg.parameter["kwargs"]["system_config"]["file_directory"]
suffix = msg.parameter["kwargs"]["system_config"]["file_suffix"]
if file_dir is None:
if suffix is None:
file_dir = "S00000-00999/S00002"
else:
file_dir = f"S00000-00999/S00002_{suffix}"
file_components = os.path.join(base_path, "data", file_dir, "S00002"), "h5"
assert worker.current_scan_info["file_components"] == file_components
@pytest.mark.parametrize(
"msg,scan_id,max_point_id,exp_num_points",