Refactor: moved parts of processing code to specific functions in separate modules (join_pbp_with_flow and aggregate_dt)

This commit is contained in:
2025-08-13 17:02:47 +02:00
parent 053d0d5b75
commit 554ec26410
3 changed files with 70 additions and 48 deletions

View File

@@ -9,10 +9,14 @@ from sp2xr.helpers import (
parse_args,
load_and_resolve_config,
initialize_cluster,
floor_index_to_dt,
)
from sp2xr.apply_calib import calibrate_single_particle
from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition
from sp2xr.resample_pbp_hk import (
build_dt_summary,
resample_hk_partition,
join_pbp_with_flow,
aggregate_dt,
)
from sp2xr.distribution import (
bin_lims_to_ctrs,
process_hist_and_dist_partition,
@@ -75,31 +79,10 @@ def main():
ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config)
ddf_cal = ddf_cal.map_partitions(
floor_index_to_dt, run_config=run_config, meta=ddf_cal._meta
)
meta_pbp_with_flow = ddf_cal._meta.copy()
meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = np.float64()
ddf_pbp_with_flow = ddf_cal.map_partitions(
lambda part: part.join(flow_dt, how="left"),
meta=meta_pbp_with_flow,
)
ddf_pbp_with_flow = ddf_pbp_with_flow.map_partitions(
lambda df: df.assign(
**{
"BC mass bin": pd.cut(
df["BC mass within range"], bins=inc_mass_bin_lims, labels=False
)
}
),
meta=ddf_pbp_with_flow._meta.assign(**{"BC mass bin": 0}),
)
ddf_pbp_with_flow = join_pbp_with_flow(ddf_cal, flow_dt, run_config)
ddf_pbp_with_flow.to_parquet(
path=f"{run_config['output']}/pbp_{run_config['dt']}s_calibrated",
path=f"{run_config['output']}/pbp_calibrated",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
@@ -112,25 +95,7 @@ def main():
build_dt_summary, dt_s=run_config["dt"], meta=build_dt_summary(ddf_cal._meta)
)
ddf_pbp_hk_dt = dd.merge(
ddf_pbp_dt, ddf_hk_dt, how="left", left_index=True, right_index=True
)
time_index = dd.to_datetime(ddf_pbp_hk_dt.index.to_series())
ddf_pbp_hk_dt["date"] = time_index.dt.normalize() # works on Series
ddf_pbp_hk_dt["hour"] = time_index.dt.hour.astype("int64")
# Optionally drop the old columns
ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"])
ddf_pbp_hk_dt.to_parquet(
path=f"{run_config['output']}/combined_pbp_hk_{run_config['dt']}s",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
overwrite=True,
)
ddf_pbp_hk_dt = aggregate_dt(ddf_pbp_dt, ddf_hk_dt, run_config)
# 4. (optional) dt bulk conc --------------------------
if run_config["do_conc"]:

View File

@@ -29,10 +29,9 @@ def load_and_resolve_config(args):
"instr_cfg": choose(args.instr_config, base, "paths.instrument_config"),
"dt": choose(args.dt, base, "workflow.dt", 1),
"do_conc": args.conc or get(base, "workflow.conc", False),
"do_BC_hist": args.BC_hist or get(base, "workflow.BC_hist", False),
"do_scatt_hist": args.scatt_hist or get(base, "workflow.scatt_hist", False),
"do_timelag_hist": args.timelag_hist
or get(base, "workflow.timelag_hist", False),
"do_BC_hist": get(base, "workflow.BC_hist", False),
"do_scatt_hist": get(base, "workflow.scatt_hist", False),
"do_timelag_hist": get(base, "workflow.timelag_hist", False),
"rho_eff": choose(args.BC_rho, base, "bc.rho_eff", 1.8),
"BC_type": choose(args.BC_type, base, "bc.type", "constant_effective_density"),
"cluster": {

View File

@@ -1,5 +1,8 @@
from typing import List
import pandas as pd
import numpy as np
import dask.dataframe as dd
from sp2xr.helpers import floor_index_to_dt
_SUM_COLS: List[str] = [
@@ -129,3 +132,58 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame:
out["hour"] = out.index.hour.astype("int64")
return out
def join_pbp_with_flow(ddf_cal, flow_dt, run_config):
inc_mass_bin_lims = np.logspace(
np.log10(run_config["histo"]["inc"]["min_mass"]),
np.log10(run_config["histo"]["inc"]["max_mass"]),
run_config["histo"]["inc"]["n_bins"],
)
ddf_cal = ddf_cal.map_partitions(
floor_index_to_dt, run_config=run_config, meta=ddf_cal._meta
)
meta_pbp_with_flow = ddf_cal._meta.copy()
meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = np.float64()
ddf_pbp_with_flow = ddf_cal.map_partitions(
lambda part: part.join(flow_dt, how="left"),
meta=meta_pbp_with_flow,
)
ddf_pbp_with_flow = ddf_pbp_with_flow.map_partitions(
lambda df: df.assign(
**{
"BC mass bin": pd.cut(
df["BC mass within range"], bins=inc_mass_bin_lims, labels=False
)
}
),
meta=ddf_pbp_with_flow._meta.assign(**{"BC mass bin": 0}),
)
return ddf_pbp_with_flow
def aggregate_dt(ddf_pbp_dt, ddf_hk_dt, run_config):
ddf_pbp_hk_dt = dd.merge(
ddf_pbp_dt, ddf_hk_dt, how="left", left_index=True, right_index=True
)
time_index = dd.to_datetime(ddf_pbp_hk_dt.index.to_series())
ddf_pbp_hk_dt["date"] = time_index.dt.normalize() # works on Series
ddf_pbp_hk_dt["hour"] = time_index.dt.hour.astype("int64")
# Optionally drop the old columns
ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"])
ddf_pbp_hk_dt.to_parquet(
path=f"{run_config['output']}/combined_pbp_hk_{run_config['dt']}s",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
overwrite=True,
)
return ddf_pbp_hk_dt