feat: adapt sp2xr_pipeline and helpers to run multiple times across the same dataset and different time slots but ensuring config settings are the same for the entire dataset

This commit is contained in:
2025-09-29 11:00:14 +02:00
parent 203bd9d740
commit da275cdc97
2 changed files with 102 additions and 2 deletions

View File

@@ -16,6 +16,7 @@ from sp2xr.helpers import (
get_time_chunks_from_range,
delete_partition_if_exists,
partition_rowcount,
validate_config_compatibility,
)
from sp2xr.calibration import calibrate_single_particle
from sp2xr.resample_pbp_hk import (
@@ -56,13 +57,42 @@ def main():
signal.signal(signal.SIGTERM, handle_sigterm)
try:
# -1. chunking
"""# -1. chunking
pbp_times = extract_partitioned_datetimes(run_config["input_pbp"])
hk_times = extract_partitioned_datetimes(run_config["input_hk"])
global_start = min(min(pbp_times), min(hk_times))
global_end = max(max(pbp_times), max(hk_times))
chunk_freq = run_config["chunking"]["freq"] # e.g. "6h", "3d"
time_chunks = get_time_chunks_from_range(global_start, global_end, chunk_freq)
"""
# -1. Validate config compatibility
validate_config_compatibility(run_config)
# -2. chunking
pbp_times = extract_partitioned_datetimes(run_config["input_pbp"])
hk_times = extract_partitioned_datetimes(run_config["input_hk"])
# Use config date range if specified, otherwise use data extent
if run_config["chunking"]["start_date"]:
global_start = pd.to_datetime(run_config["chunking"]["start_date"])
print(f"Using config start_date: {global_start}")
else:
global_start = min(min(pbp_times), min(hk_times))
print(f"Using data extent start: {global_start}")
if run_config["chunking"]["end_date"]:
global_end = pd.to_datetime(run_config["chunking"]["end_date"])
print(f"Using config end_date: {global_end}")
else:
global_end = max(max(pbp_times), max(hk_times))
print(f"Using data extent end: {global_end}")
chunk_freq = run_config["chunking"]["freq"] # e.g. "6h", "3d"
time_chunks = get_time_chunks_from_range(global_start, global_end, chunk_freq)
print(
f"Processing {len(time_chunks)} time chunks from {global_start} to {global_end}"
)
# 0. calibration stage --------------------------------------------
instr_config = yaml.safe_load(open(run_config["instr_cfg"]))
@@ -100,7 +130,7 @@ def main():
pbp_filters.append(("hour", ">=", chunk_start.hour))
pbp_filters.append(("hour", "<", chunk_end.hour))
client.restart()
#client.restart()
scattered_bins = {
"inc_mass_bin_lims": client.scatter(inc_mass_bin_lims, broadcast=True),

View File

@@ -456,6 +456,76 @@ def partition_rowcount(ddf: dd.DataFrame) -> int:
return int(row_series.sum().compute())
def validate_config_compatibility(run_config):
"""
Validate that the current config is compatible with any existing output.
Checks critical parameters that would affect data processing.
Args:
run_config: Current pipeline configuration
Raises:
ValueError: If config is incompatible with existing data
"""
import json
from pathlib import Path
output_path = Path(run_config["output"])
config_file = output_path / "pipeline_config.json"
if not config_file.exists():
# No existing config, save current one
output_path.mkdir(parents=True, exist_ok=True)
# Save key parameters that must remain consistent
key_params = {
"dt": run_config["dt"],
"instr_cfg": run_config["instr_cfg"],
"saving_schema": run_config["saving_schema"],
"rho_eff": run_config.get("rho_eff"),
"BC_type": run_config.get("BC_type"),
}
with open(config_file, "w") as f:
json.dump(key_params, f, indent=2, default=str)
print(f"Saved pipeline config to {config_file}")
return
# Load existing config and compare
with open(config_file, "r") as f:
existing_params = json.load(f)
# Check critical parameters
current_params = {
"dt": run_config["dt"],
"instr_cfg": run_config["instr_cfg"],
"saving_schema": run_config["saving_schema"],
"rho_eff": run_config.get("rho_eff"),
"BC_type": run_config.get("BC_type"),
}
mismatches = []
for key, current_val in current_params.items():
existing_val = existing_params.get(key)
if existing_val != current_val:
mismatches.append(
f" {key}: existing='{existing_val}' vs current='{current_val}'"
)
if mismatches:
raise ValueError(
f"Current config is incompatible with existing data in {output_path}\n"
f"Mismatched parameters:\n" + "\n".join(mismatches) + "\n"
"Either:\n"
" 1. Use a different output directory, or\n"
" 2. Delete existing data to restart with new config, or\n"
" 3. Update config to match existing parameters"
)
print("Config validation passed - compatible with existing data")
def extract_sp2xr_filename_parts(file_path: str | Path) -> tuple[str, str]:
"""
Extract standardized filename and folder name from SP2XR file path.