Feat: user can now decide frequency of repartition of dask dataframes after being loaded (both hk and pbp)

This commit is contained in:
2025-09-09 16:03:42 +02:00
parent e946d4ff94
commit 29e2351341
3 changed files with 10 additions and 3 deletions

View File

@@ -135,7 +135,7 @@ def main():
ddf_hk = ddf_hk.reset_index().set_index( # 'calculated_time' becomes a column
"calculated_time", sorted=False, shuffle="tasks"
) # Dask now infers divisions
ddf_hk = ddf_hk.repartition(freq="1h")
ddf_hk = ddf_hk.repartition(freq=run_config["repartition"])
meta = pd.DataFrame(
{
"Sample Flow Controller Read (sccm)": pd.Series(
@@ -182,7 +182,7 @@ def main():
ddf_raw = ddf_raw.reset_index().set_index( # 'calculated_time' becomes a column
"calculated_time", sorted=False, shuffle="tasks"
) # Dask now infers divisions
ddf_raw = ddf_raw.repartition(freq="1h")
ddf_raw = ddf_raw.repartition(freq=run_config["repartition"])
ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config)
dask_objects.append(ddf_cal)

View File

@@ -33,6 +33,7 @@ def load_and_resolve_config(args):
"output": choose(args.output, base, "paths.output"),
"instr_cfg": choose(args.instr_config, base, "paths.instrument_config"),
"dt": choose(args.dt, base, "workflow.dt", 1),
"repartition": choose(args.repartition, base, "workflow.repartition", None),
"do_conc": args.conc or get(base, "workflow.conc", False),
"do_BC_hist": get(base, "workflow.BC_hist", False),
"do_scatt_hist": get(base, "workflow.scatt_hist", False),
@@ -224,7 +225,12 @@ def parse_args():
p.add_argument(
"--dt", type=int, default=None, help="aggregation interval in seconds"
)
p.add_argument(
"--repartition",
type=str,
default=None,
help="repartition time interval for dask partitions",
)
# BC proeprties
p.add_argument("--BC_rho", type=float, default=None, help="BC density")
p.add_argument(

View File

@@ -10,6 +10,7 @@ workflow:
scatt_hist: true
timelag_hist: false
dt: 60 # seconds
repartition: '3h'
cluster:
use_local: false