From 554ec26410c189da0dc44f77d3e15d5a6069f847 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Wed, 13 Aug 2025 17:02:47 +0200 Subject: [PATCH] Refactor: moved parts of processing code to specific functions in separate modules (join_pbp_with_flow and aggregate_dt) --- scripts/sp2xr_pipeline.py | 53 ++++++-------------------------- src/sp2xr/helpers.py | 7 ++--- src/sp2xr/resample_pbp_hk.py | 58 ++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 48 deletions(-) diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 4beac0e..d5b581d 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -9,10 +9,14 @@ from sp2xr.helpers import ( parse_args, load_and_resolve_config, initialize_cluster, - floor_index_to_dt, ) from sp2xr.apply_calib import calibrate_single_particle -from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition +from sp2xr.resample_pbp_hk import ( + build_dt_summary, + resample_hk_partition, + join_pbp_with_flow, + aggregate_dt, +) from sp2xr.distribution import ( bin_lims_to_ctrs, process_hist_and_dist_partition, @@ -75,31 +79,10 @@ def main(): ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config) - ddf_cal = ddf_cal.map_partitions( - floor_index_to_dt, run_config=run_config, meta=ddf_cal._meta - ) - - meta_pbp_with_flow = ddf_cal._meta.copy() - meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = np.float64() - - ddf_pbp_with_flow = ddf_cal.map_partitions( - lambda part: part.join(flow_dt, how="left"), - meta=meta_pbp_with_flow, - ) - - ddf_pbp_with_flow = ddf_pbp_with_flow.map_partitions( - lambda df: df.assign( - **{ - "BC mass bin": pd.cut( - df["BC mass within range"], bins=inc_mass_bin_lims, labels=False - ) - } - ), - meta=ddf_pbp_with_flow._meta.assign(**{"BC mass bin": 0}), - ) + ddf_pbp_with_flow = join_pbp_with_flow(ddf_cal, flow_dt, run_config) ddf_pbp_with_flow.to_parquet( - path=f"{run_config['output']}/pbp_{run_config['dt']}s_calibrated", + path=f"{run_config['output']}/pbp_calibrated", partition_on=["date", "hour"], engine="pyarrow", write_index=True, @@ -112,25 +95,7 @@ def main(): build_dt_summary, dt_s=run_config["dt"], meta=build_dt_summary(ddf_cal._meta) ) - ddf_pbp_hk_dt = dd.merge( - ddf_pbp_dt, ddf_hk_dt, how="left", left_index=True, right_index=True - ) - time_index = dd.to_datetime(ddf_pbp_hk_dt.index.to_series()) - - ddf_pbp_hk_dt["date"] = time_index.dt.normalize() # works on Series - ddf_pbp_hk_dt["hour"] = time_index.dt.hour.astype("int64") - - # Optionally drop the old columns - ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"]) - - ddf_pbp_hk_dt.to_parquet( - path=f"{run_config['output']}/combined_pbp_hk_{run_config['dt']}s", - partition_on=["date", "hour"], - engine="pyarrow", - write_index=True, - write_metadata_file=True, - overwrite=True, - ) + ddf_pbp_hk_dt = aggregate_dt(ddf_pbp_dt, ddf_hk_dt, run_config) # 4. (optional) dt bulk conc -------------------------- if run_config["do_conc"]: diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index 529c47a..e7a92d1 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -29,10 +29,9 @@ def load_and_resolve_config(args): "instr_cfg": choose(args.instr_config, base, "paths.instrument_config"), "dt": choose(args.dt, base, "workflow.dt", 1), "do_conc": args.conc or get(base, "workflow.conc", False), - "do_BC_hist": args.BC_hist or get(base, "workflow.BC_hist", False), - "do_scatt_hist": args.scatt_hist or get(base, "workflow.scatt_hist", False), - "do_timelag_hist": args.timelag_hist - or get(base, "workflow.timelag_hist", False), + "do_BC_hist": get(base, "workflow.BC_hist", False), + "do_scatt_hist": get(base, "workflow.scatt_hist", False), + "do_timelag_hist": get(base, "workflow.timelag_hist", False), "rho_eff": choose(args.BC_rho, base, "bc.rho_eff", 1.8), "BC_type": choose(args.BC_type, base, "bc.type", "constant_effective_density"), "cluster": { diff --git a/src/sp2xr/resample_pbp_hk.py b/src/sp2xr/resample_pbp_hk.py index 485623b..0b0e390 100644 --- a/src/sp2xr/resample_pbp_hk.py +++ b/src/sp2xr/resample_pbp_hk.py @@ -1,5 +1,8 @@ from typing import List import pandas as pd +import numpy as np +import dask.dataframe as dd +from sp2xr.helpers import floor_index_to_dt _SUM_COLS: List[str] = [ @@ -129,3 +132,58 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame: out["hour"] = out.index.hour.astype("int64") return out + + +def join_pbp_with_flow(ddf_cal, flow_dt, run_config): + inc_mass_bin_lims = np.logspace( + np.log10(run_config["histo"]["inc"]["min_mass"]), + np.log10(run_config["histo"]["inc"]["max_mass"]), + run_config["histo"]["inc"]["n_bins"], + ) + + ddf_cal = ddf_cal.map_partitions( + floor_index_to_dt, run_config=run_config, meta=ddf_cal._meta + ) + + meta_pbp_with_flow = ddf_cal._meta.copy() + meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = np.float64() + + ddf_pbp_with_flow = ddf_cal.map_partitions( + lambda part: part.join(flow_dt, how="left"), + meta=meta_pbp_with_flow, + ) + + ddf_pbp_with_flow = ddf_pbp_with_flow.map_partitions( + lambda df: df.assign( + **{ + "BC mass bin": pd.cut( + df["BC mass within range"], bins=inc_mass_bin_lims, labels=False + ) + } + ), + meta=ddf_pbp_with_flow._meta.assign(**{"BC mass bin": 0}), + ) + return ddf_pbp_with_flow + + +def aggregate_dt(ddf_pbp_dt, ddf_hk_dt, run_config): + ddf_pbp_hk_dt = dd.merge( + ddf_pbp_dt, ddf_hk_dt, how="left", left_index=True, right_index=True + ) + time_index = dd.to_datetime(ddf_pbp_hk_dt.index.to_series()) + + ddf_pbp_hk_dt["date"] = time_index.dt.normalize() # works on Series + ddf_pbp_hk_dt["hour"] = time_index.dt.hour.astype("int64") + + # Optionally drop the old columns + ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"]) + + ddf_pbp_hk_dt.to_parquet( + path=f"{run_config['output']}/combined_pbp_hk_{run_config['dt']}s", + partition_on=["date", "hour"], + engine="pyarrow", + write_index=True, + write_metadata_file=True, + overwrite=True, + ) + return ddf_pbp_hk_dt