feat: add the possibility to decide the saving partition schema between date or date/hour

This commit is contained in:
2025-09-09 17:09:38 +02:00
parent 29e2351341
commit f437b1c5fe
4 changed files with 18 additions and 4 deletions

View File

@@ -201,7 +201,7 @@ def main():
write_future = ddf_pbp_with_flow.to_parquet(
path=f"{run_config['output']}/pbp_calibrated",
partition_on=["date", "hour"],
partition_on=run_config["saving_schema"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
@@ -256,7 +256,7 @@ def main():
conc_future = ddf_conc.to_parquet(
f"{run_config['output']}/conc_{run_config['dt']}s",
partition_on=["date", "hour"],
partition_on=run_config["saving_schema"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,

View File

@@ -425,7 +425,11 @@ def process_histograms(
merged_df.index = merged_df.index.astype("datetime64[ns]")
merged_ddf = dd.from_pandas(merged_df, npartitions=1)
merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize()
# merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize()
# merged_ddf["hour"] = merged_ddf["hour"].astype("int64")
time_index = dd.to_datetime(merged_ddf.index.to_series())
merged_ddf["date"] = time_index.dt.normalize() # works on Series
merged_ddf["hour"] = time_index.dt.hour.astype("int64")
# --- Save hists to parquet
delete_partition_if_exists(
@@ -439,7 +443,7 @@ def process_histograms(
# Compute immediately
hist_future = merged_ddf.to_parquet(
f"{run_config['output']}/hists_{run_config['dt']}s",
partition_on=["date"],
partition_on=run_config["saving_schema"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,

View File

@@ -34,6 +34,9 @@ def load_and_resolve_config(args):
"instr_cfg": choose(args.instr_config, base, "paths.instrument_config"),
"dt": choose(args.dt, base, "workflow.dt", 1),
"repartition": choose(args.repartition, base, "workflow.repartition", None),
"saving_schema": choose(
args.saving_schema, base, "workflow.saving_schema", None
),
"do_conc": args.conc or get(base, "workflow.conc", False),
"do_BC_hist": get(base, "workflow.BC_hist", False),
"do_scatt_hist": get(base, "workflow.scatt_hist", False),
@@ -231,6 +234,12 @@ def parse_args():
default=None,
help="repartition time interval for dask partitions",
)
p.add_argument(
"--saving_schema",
type=list,
default=None,
help="how to repartition parquet files when saved",
)
# BC proeprties
p.add_argument("--BC_rho", type=float, default=None, help="BC density")
p.add_argument(

View File

@@ -11,6 +11,7 @@ workflow:
timelag_hist: false
dt: 60 # seconds
repartition: '3h'
saving_schema: ['date']
cluster:
use_local: false