From 5f3d25817cf391d3506c5d18762b1902a813e91f Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Wed, 13 Aug 2025 15:28:57 +0200 Subject: [PATCH] Refactor: moved calibration of PbP data to new file and clear separation between run_config and instr_config --- scripts/sp2xr_pipeline.py | 105 +++++++++---------------- src/sp2xr/apply_calib.py | 42 ++++++++++ src/sp2xr/flag_single_particle_data.py | 44 ++++++----- src/sp2xr/helpers.py | 31 ++++++++ tests/data/instrument_config.yaml | 20 ----- tests/run_config.yaml | 28 +++++-- 6 files changed, 152 insertions(+), 118 deletions(-) create mode 100644 src/sp2xr/apply_calib.py diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index d659768..0ae42b3 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -10,8 +10,7 @@ from sp2xr.helpers import ( load_and_resolve_config, initialize_cluster, ) -from sp2xr.calibration import calibrate_particle_data -from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags +from sp2xr.apply_calib import calibrate_single_particle from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition from sp2xr.distribution import ( bin_lims_to_ctrs, @@ -23,9 +22,9 @@ from sp2xr.concentrations import add_concentrations def main(): args = parse_args() - config = load_and_resolve_config(args) + run_config = load_and_resolve_config(args) - client = initialize_cluster(config) + client = initialize_cluster(run_config) """args = parse_args() base = load_yaml_cfg(args.config) @@ -61,60 +60,28 @@ def main(): ) """ # 1. calibration stage -------------------------------------------- - cfg = yaml.safe_load(open(config["instr_cfg"])) - cfg_future = client.scatter(cfg, broadcast=True) + instr_config = yaml.safe_load(open(run_config["instr_cfg"])) + # cfg_future = client.scatter(cfg, broadcast=True) inc_mass_bin_lims = np.logspace( - np.log10(cfg["inc_histo"]["min_mass"]), - np.log10(cfg["inc_histo"]["max_mass"]), - cfg["inc_histo"]["n_bins"], + np.log10(run_config["histo"]["inc"]["min_mass"]), + np.log10(run_config["histo"]["inc"]["max_mass"]), + run_config["histo"]["inc"]["n_bins"], ) inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims) - ddf_raw = dd.read_parquet(config["input_pbp"], engine="pyarrow") + ddf_raw = dd.read_parquet(run_config["input_pbp"], engine="pyarrow") - meta_cal = ddf_raw._meta.copy() - meta_cal["BC mass"] = pd.Series([], dtype="float64") - meta_cal["Opt diam"] = pd.Series([], dtype="float64") - meta_cal["time_lag"] = pd.Series([], dtype="int8") - meta_cal["date"] = pd.Series([], dtype="datetime64[ns]") - meta_cal["hour"] = pd.Series([], dtype="int8") - - ddf_cal = ddf_raw.map_partitions( - calibrate_particle_data, config=cfg_future, meta=meta_cal - ) - ddf_cal["time_lag"] = ddf_cal["Incand Peak Time"] - ddf_cal["Scatter Peak Time"] - ddf_cal["ratio_inc_scatt"] = np.log10(ddf_cal["Incand relPeak"]) / np.log10( - ddf_cal["Scatter relPeak"] - ) - - ddf_cal = define_flags(ddf_cal, cfg) - - meta_cnts = add_thin_thick_flags(ddf_cal._meta) # builds empty columns for meta - ddf_cal = ddf_cal.map_partitions(add_thin_thick_flags, meta=meta_cnts) - - # Create two new columns where the particles out of range are changed to np.nan (!! do not change them to 0, otherwise below where we resample and count, the 0s would be counted being a not null value!!) - ddf_cal["BC mass within range"] = ddf_cal["BC mass"].where( - ddf_cal["flag_valid_inc_signal_in_range"], np.nan - ) - ddf_cal["Opt diam scatt only within range"] = ddf_cal["Opt diam scatt only"].where( - ( - ddf_cal["flag_valid_scatt_signal_in_range"] - & ~ddf_cal["flag_valid_inc_signal"] - ), - np.nan, - ) - - ddf_cal["temporary_col"] = 1 + ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config) meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata ddf_pbp_dt = ddf_cal.map_partitions( - build_dt_summary, dt_s=config["dt"], meta=meta_dt + build_dt_summary, dt_s=run_config["dt"], meta=meta_dt ) # 2. load house-keeping once -------------------------------------- - ddf_hk = dd.read_parquet(config["input_hk"], engine="pyarrow") + ddf_hk = dd.read_parquet(run_config["input_hk"], engine="pyarrow") # Create a meta (empty dataframe with correct schema) meta = pd.DataFrame( { @@ -128,7 +95,7 @@ def main(): # Map partition-wise ddf_hk_dt = ddf_hk.map_partitions( - resample_hk_partition, dt=f"{config['dt']}s", meta=meta + resample_hk_partition, dt=f"{run_config['dt']}s", meta=meta ) flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() @@ -138,7 +105,7 @@ def main(): Replace the existing DatetimeIndex with its lower-second value, without changing the index’s name or creating a new column. """ - pdf.index = pdf.index.floor(f"{config['dt']}s") # keeps the original .name + pdf.index = pdf.index.floor(f"{run_config['dt']}s") # keeps the original .name return pdf # meta stays identical because only the *values* of the index change @@ -166,7 +133,7 @@ def main(): ) ddf_pbp_with_flow.to_parquet( - path=f"{config['output']}/pbp_calibrated", + path=f"{run_config['output']}/pbp_calibrated", partition_on=["date", "hour"], engine="pyarrow", write_index=True, @@ -187,7 +154,7 @@ def main(): ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"]) ddf_pbp_hk_dt.to_parquet( - path=f"{config['output']}/combined_pbp_hk_{config['dt']}s", + path=f"{run_config['output']}/combined_pbp_hk_{run_config['dt']}s", partition_on=["date", "hour"], engine="pyarrow", write_index=True, @@ -197,25 +164,25 @@ def main(): # 4. (optional) dt bulk conc -------------------------- meta_conc = ( - add_concentrations(ddf_pbp_hk_dt._meta, dt=config["dt"]) - if config["do_conc"] + add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"]) + if run_config["do_conc"] else None ) - if config["do_conc"]: + if run_config["do_conc"]: print("ok") ddf_conc = ddf_pbp_hk_dt.map_partitions( - add_concentrations, dt=config["dt"], meta=meta_conc + add_concentrations, dt=run_config["dt"], meta=meta_conc ) ddf_conc.to_parquet( - f"{config['output']}/conc_{config['dt']}s", + f"{run_config['output']}/conc_{run_config['dt']}s", partition_on=["date"], overwrite=True, ) # 5. (optional) dt histograms -------------------------- - if config["do_hist"]: + if run_config["do_hist"]: # ========= 3. RUN MASS HISTOGRAMS ========= # --- Mass histogram configs hist_configs = [ @@ -249,8 +216,8 @@ def main(): bin_ctrs=inc_mass_bin_ctrs, kind="mass", flag_col=cfg_hist["flag_col"], - rho_eff=config["rho_eff"], - BC_type=config["BC_type"], + rho_eff=run_config["rho_eff"], + BC_type=run_config["BC_type"], ) ddf_out = ddf_pbp_with_flow.map_partitions( process_hist_and_dist_partition, @@ -259,11 +226,11 @@ def main(): flag_value=cfg_hist["flag_value"], bin_lims=inc_mass_bin_lims, bin_ctrs=inc_mass_bin_ctrs, - dt=config["dt"], + dt=run_config["dt"], calculate_conc=True, flow=None, - rho_eff=config["rho_eff"], - BC_type=config["BC_type"], + rho_eff=run_config["rho_eff"], + BC_type=run_config["BC_type"], t=1, meta=meta_hist, # <-- single line does the trick ) @@ -272,9 +239,9 @@ def main(): # ========= 4. RUN SCATTERING HISTOGRAM ========= # --- Scattering histogram configuration scatt_bin_lims = np.logspace( - np.log10(cfg["scatt_histo"]["min_D"]), - np.log10(cfg["scatt_histo"]["max_D"]), - cfg["scatt_histo"]["n_bins"], + np.log10(run_config["histo"]["scatt"]["min_D"]), + np.log10(run_config["histo"]["scatt"]["max_D"]), + run_config["histo"]["scatt"]["n_bins"], ) scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims) @@ -292,7 +259,7 @@ def main(): flag_value=None, bin_lims=scatt_bin_lims, bin_ctrs=scatt_bin_ctrs, - dt=config["dt"], + dt=run_config["dt"], calculate_conc=True, flow=None, rho_eff=None, @@ -306,9 +273,9 @@ def main(): # --- Timelag histogram parameters timelag_bins_lims = np.linspace( - cfg["time_delay_histo"]["min"], - cfg["time_delay_histo"]["max"], - cfg["time_delay_histo"]["n_bins"], + run_config["histo"]["timelag"]["min"], + run_config["histo"]["timelag"]["max"], + run_config["histo"]["timelag"]["n_bins"], ) timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims) @@ -334,7 +301,7 @@ def main(): flag_value=1, bin_lims=timelag_bins_lims, bin_ctrs=timelag_bin_ctrs, - dt=config["dt"], + dt=run_config["dt"], calculate_conc=True, flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"], rho_eff=None, @@ -358,7 +325,7 @@ def main(): # ========= 7. SAVE TO PARQUET ========= merged_ddf.to_parquet( - f"{config['output']}/hists_{config['dt']}s", + f"{run_config['output']}/hists_{run_config['dt']}s", partition_on=["date"], overwrite=True, ) diff --git a/src/sp2xr/apply_calib.py b/src/sp2xr/apply_calib.py new file mode 100644 index 0000000..3c23ea4 --- /dev/null +++ b/src/sp2xr/apply_calib.py @@ -0,0 +1,42 @@ +import numpy as np +import pandas as pd +from sp2xr.calibration import calibrate_particle_data +from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags + + +def calibrate_single_particle(ddf_raw, instr_config, run_config): + meta_cal = ddf_raw._meta.copy() + meta_cal["BC mass"] = pd.Series([], dtype="float64") + meta_cal["Opt diam"] = pd.Series([], dtype="float64") + meta_cal["time_lag"] = pd.Series([], dtype="int8") + meta_cal["date"] = pd.Series([], dtype="datetime64[ns]") + meta_cal["hour"] = pd.Series([], dtype="int8") + + ddf_cal = ddf_raw.map_partitions( + calibrate_particle_data, config=run_config, meta=meta_cal + ) + ddf_cal["time_lag"] = ddf_cal["Incand Peak Time"] - ddf_cal["Scatter Peak Time"] + ddf_cal["ratio_inc_scatt"] = np.log10(ddf_cal["Incand relPeak"]) / np.log10( + ddf_cal["Scatter relPeak"] + ) + + ddf_cal = define_flags(ddf_cal, instr_config, run_config) + + meta_cnts = add_thin_thick_flags(ddf_cal._meta) # builds empty columns for meta + ddf_cal = ddf_cal.map_partitions(add_thin_thick_flags, meta=meta_cnts) + + # Create two new columns where the particles out of range are changed to np.nan (!! do not change them to 0, otherwise below where we resample and count, the 0s would be counted being a not null value!!) + ddf_cal["BC mass within range"] = ddf_cal["BC mass"].where( + ddf_cal["flag_valid_inc_signal_in_range"], np.nan + ) + ddf_cal["Opt diam scatt only within range"] = ddf_cal["Opt diam scatt only"].where( + ( + ddf_cal["flag_valid_scatt_signal_in_range"] + & ~ddf_cal["flag_valid_inc_signal"] + ), + np.nan, + ) + + ddf_cal["temporary_col"] = 1 + + return ddf_cal diff --git a/src/sp2xr/flag_single_particle_data.py b/src/sp2xr/flag_single_particle_data.py index c670323..c8f93f5 100644 --- a/src/sp2xr/flag_single_particle_data.py +++ b/src/sp2xr/flag_single_particle_data.py @@ -2,63 +2,65 @@ import numpy as np import pandas as pd -def define_flags(pdf_pbp, config): +def define_flags(pdf_pbp, instr_config, run_config): flag_inc_transit_time = ( pdf_pbp["Incand Transit Time"] - >= config["instrument_parameters"]["IncTransitMin"] + >= instr_config["instrument_parameters"]["IncTransitMin"] ) & ( pdf_pbp["Incand Transit Time"] - <= config["instrument_parameters"]["IncTransitMax"] + <= instr_config["instrument_parameters"]["IncTransitMax"] ) flag_inc_fwhm = ( - pdf_pbp["Incand FWHM"] >= config["instrument_parameters"]["IncFWHMMin"] - ) & (pdf_pbp["Incand FWHM"] <= config["instrument_parameters"]["IncFWHMMax"]) + pdf_pbp["Incand FWHM"] >= instr_config["instrument_parameters"]["IncFWHMMin"] + ) & (pdf_pbp["Incand FWHM"] <= instr_config["instrument_parameters"]["IncFWHMMax"]) flag_inc = flag_inc_transit_time & flag_inc_fwhm pdf_pbp["flag_valid_inc_signal"] = flag_inc pdf_pbp["flag_inc_not_sat"] = ( - pdf_pbp["Incand relPeak"] < config["Signal_saturation"]["IncSatPoint"] + pdf_pbp["Incand relPeak"] < instr_config["Signal_saturation"]["IncSatPoint"] ) flag_inc_in_range = ( flag_inc - & (pdf_pbp["BC mass"] >= config["inc_histo"]["min_mass"]) - & (pdf_pbp["BC mass"] <= config["inc_histo"]["max_mass"]) + & (pdf_pbp["BC mass"] >= run_config["histo"]["inc"]["min_mass"]) + & (pdf_pbp["BC mass"] <= run_config["histo"]["inc"]["max_mass"]) ) pdf_pbp["flag_valid_inc_signal_in_range"] = flag_inc_in_range flag_scatt_transit_time = ( pdf_pbp["Scatter Transit Time"] - >= config["instrument_parameters"]["ScattTransitMin"] + >= instr_config["instrument_parameters"]["ScattTransitMin"] ) & ( pdf_pbp["Scatter Transit Time"] - <= config["instrument_parameters"]["ScattTransitMax"] + <= instr_config["instrument_parameters"]["ScattTransitMax"] ) flag_scatt_fwhm = ( - pdf_pbp["Scatter FWHM"] >= config["instrument_parameters"]["ScattFWHMMin"] - ) & (pdf_pbp["Scatter FWHM"] <= config["instrument_parameters"]["ScattFWHMMax"]) + pdf_pbp["Scatter FWHM"] >= instr_config["instrument_parameters"]["ScattFWHMMin"] + ) & ( + pdf_pbp["Scatter FWHM"] <= instr_config["instrument_parameters"]["ScattFWHMMax"] + ) flag_scatt = flag_scatt_transit_time & flag_scatt_fwhm pdf_pbp["flag_valid_scatt_signal"] = flag_scatt pdf_pbp["flag_scatt_not_sat"] = ( - pdf_pbp["Scatter relPeak"] < config["Signal_saturation"]["ScattSatPoint"] + pdf_pbp["Scatter relPeak"] < instr_config["Signal_saturation"]["ScattSatPoint"] ) flag_scatt_in_range = ( flag_scatt - & (pdf_pbp["Opt diam"] >= config["scatt_histo"]["min_D"]) - & (pdf_pbp["Opt diam"] <= config["scatt_histo"]["max_D"]) + & (pdf_pbp["Opt diam"] >= run_config["histo"]["scatt"]["min_D"]) + & (pdf_pbp["Opt diam"] <= run_config["histo"]["scatt"]["max_D"]) ) pdf_pbp["flag_valid_scatt_signal_in_range"] = flag_scatt_in_range pdf_pbp["flag_negative_timelag"] = ( - pdf_pbp["time_lag"] < config["time_delay_histo"]["min"] + pdf_pbp["time_lag"] < run_config["histo"]["timelag"]["min"] ) pdf_pbp["flag_extreme_positive_timelag"] = ( - pdf_pbp["time_lag"] >= config["time_delay_histo"]["max"] + pdf_pbp["time_lag"] >= run_config["histo"]["timelag"]["max"] ) pdf_pbp["flag_valid_timelag_thin"] = ( - pdf_pbp["time_lag"] < config["thick_coating"]["threshold"] - ) & (pdf_pbp["time_lag"] >= config["time_delay_histo"]["min"]) + pdf_pbp["time_lag"] < run_config["mixing_state"]["threshold"] + ) & (pdf_pbp["time_lag"] >= run_config["histo"]["timelag"]["min"]) pdf_pbp["flag_valid_timelag_thick"] = ( - pdf_pbp["time_lag"] >= config["thick_coating"]["threshold"] - ) & (pdf_pbp["time_lag"] < config["time_delay_histo"]["max"]) + pdf_pbp["time_lag"] >= run_config["mixing_state"]["threshold"] + ) & (pdf_pbp["time_lag"] < run_config["histo"]["timelag"]["max"]) pdf_pbp["flag_low_ratio_inc_scatt"] = pdf_pbp["ratio_inc_scatt"] < 1.1 diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index cba43b9..8e8704c 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -29,6 +29,37 @@ def load_and_resolve_config(args): "partition": choose(args.partition, base, "cluster.partition", "hourly"), "log_dir": get(base, "cluster.log_dir", "./slurm_out"), }, + "calibration": { + "incandescence": { + "curve_type": get(base, "calibration.incadescence.curve_type", None), + "parameters": get(base, "calibration.incadescence.parameters", None), + }, + "scattering": { + "curve_type": get(base, "calibration.scattering.curve_type", None), + "parameters": get(base, "calibration.scattering.parameters", None), + }, + }, + "histo": { + "inc": { + "min_mass": get(base, "histo.inc.min_mass", 0.3), + "max_mass": get(base, "histo.inc.max_mass", 400), + "n_bins": get(base, "histo.inc.n_bins", 50), + }, + "scatt": { + "min_D": get(base, "histo.scatt.min_D", 100), + "max_D": get(base, "histo.scatt.max_D", 500), + "n_bins": get(base, "histo.scatt.n_bins", 20), + }, + "timelag": { + "min": get(base, "histo.timelag.min", -10), + "max": get(base, "histo.timelag.max", 400), + "n_bins": get(base, "histo.timelag.n_bins", 100), + }, + }, + "mixing_state": { + "threshold": get(base, "mixing_state.threshold", 50), + "inc_scatt_ratio": get(base, "mixing_state.inc_scatt_ratio", 1.1), + }, "base": base, } diff --git a/tests/data/instrument_config.yaml b/tests/data/instrument_config.yaml index db88ad3..eae4592 100644 --- a/tests/data/instrument_config.yaml +++ b/tests/data/instrument_config.yaml @@ -13,23 +13,3 @@ instrument_parameters: Signal_saturation: IncSatPoint: 1700000000.0 ScattSatPoint: 1700000000.0 -inc_calibration: - curve_type: "polynomial" - parameters: [0.05, 2.0470000507725255e-07] -inc_histo: - min_mass: 0.3 - max_mass: 400 - n_bins: 50 -scatt_calibration: - curve_type: "powerlaw" - parameters: [17.21724257, 0.16908516, -1.49431104] -scatt_histo: - min_D: 100 - max_D: 500 - n_bins: 20 -time_delay_histo: - min: -10 - max: 400 - n_bins: 100 -thick_coating: - threshold: 50 diff --git a/tests/run_config.yaml b/tests/run_config.yaml index 62cc40a..7446ef8 100644 --- a/tests/run_config.yaml +++ b/tests/run_config.yaml @@ -22,14 +22,26 @@ bc: histo: inc: - min_mass: 1e-18 - max_mass: 1e-12 - n_bins: 40 + min_mass: 0.3 + max_mass: 400 + n_bins: 50 scatt: min_D: 100 - max_D: 1000 - n_bins: 30 + max_D: 500 + n_bins: 20 timelag: - min: -50 - max: 100 - n_bins: 150 \ No newline at end of file + min: -10 + max: 400 + n_bins: 100 + +mixing_state: + threshold: 50 + inc_scatt_ratio: 1.1 + +calibration: + incandescence: + curve_type: "polynomial" + parameters: [0.05, 2.0470000507725255e-07] + scattering: + curve_type: "powerlaw" + parameters: [17.21724257, 0.16908516, -1.49431104] \ No newline at end of file