From da275cdc97ec8d9d97bc65df9f13987e342b8bd4 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Mon, 29 Sep 2025 11:00:14 +0200 Subject: [PATCH] feat: adapt sp2xr_pipeline and helpers to run multiple times across the same dataset and different time slots but ensuring config settings are the same for the entire dataset --- scripts/sp2xr_pipeline.py | 34 +++++++++++++++++-- src/sp2xr/helpers.py | 70 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 9693592..947bb81 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -16,6 +16,7 @@ from sp2xr.helpers import ( get_time_chunks_from_range, delete_partition_if_exists, partition_rowcount, + validate_config_compatibility, ) from sp2xr.calibration import calibrate_single_particle from sp2xr.resample_pbp_hk import ( @@ -56,13 +57,42 @@ def main(): signal.signal(signal.SIGTERM, handle_sigterm) try: - # -1. chunking + """# -1. chunking pbp_times = extract_partitioned_datetimes(run_config["input_pbp"]) hk_times = extract_partitioned_datetimes(run_config["input_hk"]) global_start = min(min(pbp_times), min(hk_times)) global_end = max(max(pbp_times), max(hk_times)) chunk_freq = run_config["chunking"]["freq"] # e.g. "6h", "3d" time_chunks = get_time_chunks_from_range(global_start, global_end, chunk_freq) + """ + # -1. Validate config compatibility + validate_config_compatibility(run_config) + + # -2. chunking + pbp_times = extract_partitioned_datetimes(run_config["input_pbp"]) + hk_times = extract_partitioned_datetimes(run_config["input_hk"]) + + # Use config date range if specified, otherwise use data extent + if run_config["chunking"]["start_date"]: + global_start = pd.to_datetime(run_config["chunking"]["start_date"]) + print(f"Using config start_date: {global_start}") + else: + global_start = min(min(pbp_times), min(hk_times)) + print(f"Using data extent start: {global_start}") + + if run_config["chunking"]["end_date"]: + global_end = pd.to_datetime(run_config["chunking"]["end_date"]) + print(f"Using config end_date: {global_end}") + else: + global_end = max(max(pbp_times), max(hk_times)) + print(f"Using data extent end: {global_end}") + + chunk_freq = run_config["chunking"]["freq"] # e.g. "6h", "3d" + time_chunks = get_time_chunks_from_range(global_start, global_end, chunk_freq) + + print( + f"Processing {len(time_chunks)} time chunks from {global_start} to {global_end}" + ) # 0. calibration stage -------------------------------------------- instr_config = yaml.safe_load(open(run_config["instr_cfg"])) @@ -100,7 +130,7 @@ def main(): pbp_filters.append(("hour", ">=", chunk_start.hour)) pbp_filters.append(("hour", "<", chunk_end.hour)) - client.restart() + #client.restart() scattered_bins = { "inc_mass_bin_lims": client.scatter(inc_mass_bin_lims, broadcast=True), diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index 2d4335f..503b6ac 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -456,6 +456,76 @@ def partition_rowcount(ddf: dd.DataFrame) -> int: return int(row_series.sum().compute()) +def validate_config_compatibility(run_config): + """ + Validate that the current config is compatible with any existing output. + Checks critical parameters that would affect data processing. + + Args: + run_config: Current pipeline configuration + + Raises: + ValueError: If config is incompatible with existing data + """ + import json + from pathlib import Path + + output_path = Path(run_config["output"]) + config_file = output_path / "pipeline_config.json" + + if not config_file.exists(): + # No existing config, save current one + output_path.mkdir(parents=True, exist_ok=True) + + # Save key parameters that must remain consistent + key_params = { + "dt": run_config["dt"], + "instr_cfg": run_config["instr_cfg"], + "saving_schema": run_config["saving_schema"], + "rho_eff": run_config.get("rho_eff"), + "BC_type": run_config.get("BC_type"), + } + + with open(config_file, "w") as f: + json.dump(key_params, f, indent=2, default=str) + + print(f"Saved pipeline config to {config_file}") + return + + # Load existing config and compare + with open(config_file, "r") as f: + existing_params = json.load(f) + + # Check critical parameters + current_params = { + "dt": run_config["dt"], + "instr_cfg": run_config["instr_cfg"], + "saving_schema": run_config["saving_schema"], + "rho_eff": run_config.get("rho_eff"), + "BC_type": run_config.get("BC_type"), + } + + mismatches = [] + for key, current_val in current_params.items(): + existing_val = existing_params.get(key) + if existing_val != current_val: + mismatches.append( + f" {key}: existing='{existing_val}' vs current='{current_val}'" + ) + + if mismatches: + raise ValueError( + f"Current config is incompatible with existing data in {output_path}\n" + f"Mismatched parameters:\n" + "\n".join(mismatches) + "\n" + "Either:\n" + " 1. Use a different output directory, or\n" + " 2. Delete existing data to restart with new config, or\n" + " 3. Update config to match existing parameters" + ) + + print("Config validation passed - compatible with existing data") + + def extract_sp2xr_filename_parts(file_path: str | Path) -> tuple[str, str]: """ Extract standardized filename and folder name from SP2XR file path.