From f437b1c5fecd068bf0de363ab4b1e4ee40f84abd Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Tue, 9 Sep 2025 17:09:38 +0200 Subject: [PATCH] feat: add the possibility to decide the saving partition schema between date or date/hour --- scripts/sp2xr_pipeline.py | 4 ++-- src/sp2xr/distribution.py | 8 ++++++-- src/sp2xr/helpers.py | 9 +++++++++ tests/run_config.yaml | 1 + 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 4f7fe0c..97fb3b5 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -201,7 +201,7 @@ def main(): write_future = ddf_pbp_with_flow.to_parquet( path=f"{run_config['output']}/pbp_calibrated", - partition_on=["date", "hour"], + partition_on=run_config["saving_schema"], engine="pyarrow", write_index=True, write_metadata_file=True, @@ -256,7 +256,7 @@ def main(): conc_future = ddf_conc.to_parquet( f"{run_config['output']}/conc_{run_config['dt']}s", - partition_on=["date", "hour"], + partition_on=run_config["saving_schema"], engine="pyarrow", write_index=True, write_metadata_file=True, diff --git a/src/sp2xr/distribution.py b/src/sp2xr/distribution.py index 6e372e0..8bb1eed 100644 --- a/src/sp2xr/distribution.py +++ b/src/sp2xr/distribution.py @@ -425,7 +425,11 @@ def process_histograms( merged_df.index = merged_df.index.astype("datetime64[ns]") merged_ddf = dd.from_pandas(merged_df, npartitions=1) - merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize() + # merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize() + # merged_ddf["hour"] = merged_ddf["hour"].astype("int64") + time_index = dd.to_datetime(merged_ddf.index.to_series()) + merged_ddf["date"] = time_index.dt.normalize() # works on Series + merged_ddf["hour"] = time_index.dt.hour.astype("int64") # --- Save hists to parquet delete_partition_if_exists( @@ -439,7 +443,7 @@ def process_histograms( # Compute immediately hist_future = merged_ddf.to_parquet( f"{run_config['output']}/hists_{run_config['dt']}s", - partition_on=["date"], + partition_on=run_config["saving_schema"], engine="pyarrow", write_index=True, write_metadata_file=True, diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index ca82ab3..7c2d6bd 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -34,6 +34,9 @@ def load_and_resolve_config(args): "instr_cfg": choose(args.instr_config, base, "paths.instrument_config"), "dt": choose(args.dt, base, "workflow.dt", 1), "repartition": choose(args.repartition, base, "workflow.repartition", None), + "saving_schema": choose( + args.saving_schema, base, "workflow.saving_schema", None + ), "do_conc": args.conc or get(base, "workflow.conc", False), "do_BC_hist": get(base, "workflow.BC_hist", False), "do_scatt_hist": get(base, "workflow.scatt_hist", False), @@ -231,6 +234,12 @@ def parse_args(): default=None, help="repartition time interval for dask partitions", ) + p.add_argument( + "--saving_schema", + type=list, + default=None, + help="how to repartition parquet files when saved", + ) # BC proeprties p.add_argument("--BC_rho", type=float, default=None, help="BC density") p.add_argument( diff --git a/tests/run_config.yaml b/tests/run_config.yaml index ac95f91..4ef37dd 100644 --- a/tests/run_config.yaml +++ b/tests/run_config.yaml @@ -11,6 +11,7 @@ workflow: timelag_hist: false dt: 60 # seconds repartition: '3h' + saving_schema: ['date'] cluster: use_local: false