diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 21c2a51..4f7fe0c 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -135,7 +135,7 @@ def main(): ddf_hk = ddf_hk.reset_index().set_index( # 'calculated_time' becomes a column "calculated_time", sorted=False, shuffle="tasks" ) # Dask now infers divisions - ddf_hk = ddf_hk.repartition(freq="1h") + ddf_hk = ddf_hk.repartition(freq=run_config["repartition"]) meta = pd.DataFrame( { "Sample Flow Controller Read (sccm)": pd.Series( @@ -182,7 +182,7 @@ def main(): ddf_raw = ddf_raw.reset_index().set_index( # 'calculated_time' becomes a column "calculated_time", sorted=False, shuffle="tasks" ) # Dask now infers divisions - ddf_raw = ddf_raw.repartition(freq="1h") + ddf_raw = ddf_raw.repartition(freq=run_config["repartition"]) ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config) dask_objects.append(ddf_cal) diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index 1e2cfca..ca82ab3 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -33,6 +33,7 @@ def load_and_resolve_config(args): "output": choose(args.output, base, "paths.output"), "instr_cfg": choose(args.instr_config, base, "paths.instrument_config"), "dt": choose(args.dt, base, "workflow.dt", 1), + "repartition": choose(args.repartition, base, "workflow.repartition", None), "do_conc": args.conc or get(base, "workflow.conc", False), "do_BC_hist": get(base, "workflow.BC_hist", False), "do_scatt_hist": get(base, "workflow.scatt_hist", False), @@ -224,7 +225,12 @@ def parse_args(): p.add_argument( "--dt", type=int, default=None, help="aggregation interval in seconds" ) - + p.add_argument( + "--repartition", + type=str, + default=None, + help="repartition time interval for dask partitions", + ) # BC proeprties p.add_argument("--BC_rho", type=float, default=None, help="BC density") p.add_argument( diff --git a/tests/run_config.yaml b/tests/run_config.yaml index 1c39be1..ac95f91 100644 --- a/tests/run_config.yaml +++ b/tests/run_config.yaml @@ -10,6 +10,7 @@ workflow: scatt_hist: true timelag_hist: false dt: 60 # seconds + repartition: '3h' cluster: use_local: false