Refactor: moved calibration of PbP data to new file and clear separation between run_config and instr_config

This commit is contained in:
2025-08-13 15:28:57 +02:00
parent 6f87b4cc79
commit 5f3d25817c
6 changed files with 152 additions and 118 deletions

View File

@@ -10,8 +10,7 @@ from sp2xr.helpers import (
load_and_resolve_config,
initialize_cluster,
)
from sp2xr.calibration import calibrate_particle_data
from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags
from sp2xr.apply_calib import calibrate_single_particle
from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition
from sp2xr.distribution import (
bin_lims_to_ctrs,
@@ -23,9 +22,9 @@ from sp2xr.concentrations import add_concentrations
def main():
args = parse_args()
config = load_and_resolve_config(args)
run_config = load_and_resolve_config(args)
client = initialize_cluster(config)
client = initialize_cluster(run_config)
"""args = parse_args()
base = load_yaml_cfg(args.config)
@@ -61,60 +60,28 @@ def main():
)
"""
# 1. calibration stage --------------------------------------------
cfg = yaml.safe_load(open(config["instr_cfg"]))
cfg_future = client.scatter(cfg, broadcast=True)
instr_config = yaml.safe_load(open(run_config["instr_cfg"]))
# cfg_future = client.scatter(cfg, broadcast=True)
inc_mass_bin_lims = np.logspace(
np.log10(cfg["inc_histo"]["min_mass"]),
np.log10(cfg["inc_histo"]["max_mass"]),
cfg["inc_histo"]["n_bins"],
np.log10(run_config["histo"]["inc"]["min_mass"]),
np.log10(run_config["histo"]["inc"]["max_mass"]),
run_config["histo"]["inc"]["n_bins"],
)
inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims)
ddf_raw = dd.read_parquet(config["input_pbp"], engine="pyarrow")
ddf_raw = dd.read_parquet(run_config["input_pbp"], engine="pyarrow")
meta_cal = ddf_raw._meta.copy()
meta_cal["BC mass"] = pd.Series([], dtype="float64")
meta_cal["Opt diam"] = pd.Series([], dtype="float64")
meta_cal["time_lag"] = pd.Series([], dtype="int8")
meta_cal["date"] = pd.Series([], dtype="datetime64[ns]")
meta_cal["hour"] = pd.Series([], dtype="int8")
ddf_cal = ddf_raw.map_partitions(
calibrate_particle_data, config=cfg_future, meta=meta_cal
)
ddf_cal["time_lag"] = ddf_cal["Incand Peak Time"] - ddf_cal["Scatter Peak Time"]
ddf_cal["ratio_inc_scatt"] = np.log10(ddf_cal["Incand relPeak"]) / np.log10(
ddf_cal["Scatter relPeak"]
)
ddf_cal = define_flags(ddf_cal, cfg)
meta_cnts = add_thin_thick_flags(ddf_cal._meta) # builds empty columns for meta
ddf_cal = ddf_cal.map_partitions(add_thin_thick_flags, meta=meta_cnts)
# Create two new columns where the particles out of range are changed to np.nan (!! do not change them to 0, otherwise below where we resample and count, the 0s would be counted being a not null value!!)
ddf_cal["BC mass within range"] = ddf_cal["BC mass"].where(
ddf_cal["flag_valid_inc_signal_in_range"], np.nan
)
ddf_cal["Opt diam scatt only within range"] = ddf_cal["Opt diam scatt only"].where(
(
ddf_cal["flag_valid_scatt_signal_in_range"]
& ~ddf_cal["flag_valid_inc_signal"]
),
np.nan,
)
ddf_cal["temporary_col"] = 1
ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config)
meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata
ddf_pbp_dt = ddf_cal.map_partitions(
build_dt_summary, dt_s=config["dt"], meta=meta_dt
build_dt_summary, dt_s=run_config["dt"], meta=meta_dt
)
# 2. load house-keeping once --------------------------------------
ddf_hk = dd.read_parquet(config["input_hk"], engine="pyarrow")
ddf_hk = dd.read_parquet(run_config["input_hk"], engine="pyarrow")
# Create a meta (empty dataframe with correct schema)
meta = pd.DataFrame(
{
@@ -128,7 +95,7 @@ def main():
# Map partition-wise
ddf_hk_dt = ddf_hk.map_partitions(
resample_hk_partition, dt=f"{config['dt']}s", meta=meta
resample_hk_partition, dt=f"{run_config['dt']}s", meta=meta
)
flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
@@ -138,7 +105,7 @@ def main():
Replace the existing DatetimeIndex with its lower-second value,
without changing the indexs name or creating a new column.
"""
pdf.index = pdf.index.floor(f"{config['dt']}s") # keeps the original .name
pdf.index = pdf.index.floor(f"{run_config['dt']}s") # keeps the original .name
return pdf
# meta stays identical because only the *values* of the index change
@@ -166,7 +133,7 @@ def main():
)
ddf_pbp_with_flow.to_parquet(
path=f"{config['output']}/pbp_calibrated",
path=f"{run_config['output']}/pbp_calibrated",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
@@ -187,7 +154,7 @@ def main():
ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"])
ddf_pbp_hk_dt.to_parquet(
path=f"{config['output']}/combined_pbp_hk_{config['dt']}s",
path=f"{run_config['output']}/combined_pbp_hk_{run_config['dt']}s",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
@@ -197,25 +164,25 @@ def main():
# 4. (optional) dt bulk conc --------------------------
meta_conc = (
add_concentrations(ddf_pbp_hk_dt._meta, dt=config["dt"])
if config["do_conc"]
add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"])
if run_config["do_conc"]
else None
)
if config["do_conc"]:
if run_config["do_conc"]:
print("ok")
ddf_conc = ddf_pbp_hk_dt.map_partitions(
add_concentrations, dt=config["dt"], meta=meta_conc
add_concentrations, dt=run_config["dt"], meta=meta_conc
)
ddf_conc.to_parquet(
f"{config['output']}/conc_{config['dt']}s",
f"{run_config['output']}/conc_{run_config['dt']}s",
partition_on=["date"],
overwrite=True,
)
# 5. (optional) dt histograms --------------------------
if config["do_hist"]:
if run_config["do_hist"]:
# ========= 3. RUN MASS HISTOGRAMS =========
# --- Mass histogram configs
hist_configs = [
@@ -249,8 +216,8 @@ def main():
bin_ctrs=inc_mass_bin_ctrs,
kind="mass",
flag_col=cfg_hist["flag_col"],
rho_eff=config["rho_eff"],
BC_type=config["BC_type"],
rho_eff=run_config["rho_eff"],
BC_type=run_config["BC_type"],
)
ddf_out = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
@@ -259,11 +226,11 @@ def main():
flag_value=cfg_hist["flag_value"],
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt=config["dt"],
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=config["rho_eff"],
BC_type=config["BC_type"],
rho_eff=run_config["rho_eff"],
BC_type=run_config["BC_type"],
t=1,
meta=meta_hist, # <-- single line does the trick
)
@@ -272,9 +239,9 @@ def main():
# ========= 4. RUN SCATTERING HISTOGRAM =========
# --- Scattering histogram configuration
scatt_bin_lims = np.logspace(
np.log10(cfg["scatt_histo"]["min_D"]),
np.log10(cfg["scatt_histo"]["max_D"]),
cfg["scatt_histo"]["n_bins"],
np.log10(run_config["histo"]["scatt"]["min_D"]),
np.log10(run_config["histo"]["scatt"]["max_D"]),
run_config["histo"]["scatt"]["n_bins"],
)
scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims)
@@ -292,7 +259,7 @@ def main():
flag_value=None,
bin_lims=scatt_bin_lims,
bin_ctrs=scatt_bin_ctrs,
dt=config["dt"],
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=None,
@@ -306,9 +273,9 @@ def main():
# --- Timelag histogram parameters
timelag_bins_lims = np.linspace(
cfg["time_delay_histo"]["min"],
cfg["time_delay_histo"]["max"],
cfg["time_delay_histo"]["n_bins"],
run_config["histo"]["timelag"]["min"],
run_config["histo"]["timelag"]["max"],
run_config["histo"]["timelag"]["n_bins"],
)
timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims)
@@ -334,7 +301,7 @@ def main():
flag_value=1,
bin_lims=timelag_bins_lims,
bin_ctrs=timelag_bin_ctrs,
dt=config["dt"],
dt=run_config["dt"],
calculate_conc=True,
flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"],
rho_eff=None,
@@ -358,7 +325,7 @@ def main():
# ========= 7. SAVE TO PARQUET =========
merged_ddf.to_parquet(
f"{config['output']}/hists_{config['dt']}s",
f"{run_config['output']}/hists_{run_config['dt']}s",
partition_on=["date"],
overwrite=True,
)

42
src/sp2xr/apply_calib.py Normal file
View File

@@ -0,0 +1,42 @@
import numpy as np
import pandas as pd
from sp2xr.calibration import calibrate_particle_data
from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags
def calibrate_single_particle(ddf_raw, instr_config, run_config):
meta_cal = ddf_raw._meta.copy()
meta_cal["BC mass"] = pd.Series([], dtype="float64")
meta_cal["Opt diam"] = pd.Series([], dtype="float64")
meta_cal["time_lag"] = pd.Series([], dtype="int8")
meta_cal["date"] = pd.Series([], dtype="datetime64[ns]")
meta_cal["hour"] = pd.Series([], dtype="int8")
ddf_cal = ddf_raw.map_partitions(
calibrate_particle_data, config=run_config, meta=meta_cal
)
ddf_cal["time_lag"] = ddf_cal["Incand Peak Time"] - ddf_cal["Scatter Peak Time"]
ddf_cal["ratio_inc_scatt"] = np.log10(ddf_cal["Incand relPeak"]) / np.log10(
ddf_cal["Scatter relPeak"]
)
ddf_cal = define_flags(ddf_cal, instr_config, run_config)
meta_cnts = add_thin_thick_flags(ddf_cal._meta) # builds empty columns for meta
ddf_cal = ddf_cal.map_partitions(add_thin_thick_flags, meta=meta_cnts)
# Create two new columns where the particles out of range are changed to np.nan (!! do not change them to 0, otherwise below where we resample and count, the 0s would be counted being a not null value!!)
ddf_cal["BC mass within range"] = ddf_cal["BC mass"].where(
ddf_cal["flag_valid_inc_signal_in_range"], np.nan
)
ddf_cal["Opt diam scatt only within range"] = ddf_cal["Opt diam scatt only"].where(
(
ddf_cal["flag_valid_scatt_signal_in_range"]
& ~ddf_cal["flag_valid_inc_signal"]
),
np.nan,
)
ddf_cal["temporary_col"] = 1
return ddf_cal

View File

@@ -2,63 +2,65 @@ import numpy as np
import pandas as pd
def define_flags(pdf_pbp, config):
def define_flags(pdf_pbp, instr_config, run_config):
flag_inc_transit_time = (
pdf_pbp["Incand Transit Time"]
>= config["instrument_parameters"]["IncTransitMin"]
>= instr_config["instrument_parameters"]["IncTransitMin"]
) & (
pdf_pbp["Incand Transit Time"]
<= config["instrument_parameters"]["IncTransitMax"]
<= instr_config["instrument_parameters"]["IncTransitMax"]
)
flag_inc_fwhm = (
pdf_pbp["Incand FWHM"] >= config["instrument_parameters"]["IncFWHMMin"]
) & (pdf_pbp["Incand FWHM"] <= config["instrument_parameters"]["IncFWHMMax"])
pdf_pbp["Incand FWHM"] >= instr_config["instrument_parameters"]["IncFWHMMin"]
) & (pdf_pbp["Incand FWHM"] <= instr_config["instrument_parameters"]["IncFWHMMax"])
flag_inc = flag_inc_transit_time & flag_inc_fwhm
pdf_pbp["flag_valid_inc_signal"] = flag_inc
pdf_pbp["flag_inc_not_sat"] = (
pdf_pbp["Incand relPeak"] < config["Signal_saturation"]["IncSatPoint"]
pdf_pbp["Incand relPeak"] < instr_config["Signal_saturation"]["IncSatPoint"]
)
flag_inc_in_range = (
flag_inc
& (pdf_pbp["BC mass"] >= config["inc_histo"]["min_mass"])
& (pdf_pbp["BC mass"] <= config["inc_histo"]["max_mass"])
& (pdf_pbp["BC mass"] >= run_config["histo"]["inc"]["min_mass"])
& (pdf_pbp["BC mass"] <= run_config["histo"]["inc"]["max_mass"])
)
pdf_pbp["flag_valid_inc_signal_in_range"] = flag_inc_in_range
flag_scatt_transit_time = (
pdf_pbp["Scatter Transit Time"]
>= config["instrument_parameters"]["ScattTransitMin"]
>= instr_config["instrument_parameters"]["ScattTransitMin"]
) & (
pdf_pbp["Scatter Transit Time"]
<= config["instrument_parameters"]["ScattTransitMax"]
<= instr_config["instrument_parameters"]["ScattTransitMax"]
)
flag_scatt_fwhm = (
pdf_pbp["Scatter FWHM"] >= config["instrument_parameters"]["ScattFWHMMin"]
) & (pdf_pbp["Scatter FWHM"] <= config["instrument_parameters"]["ScattFWHMMax"])
pdf_pbp["Scatter FWHM"] >= instr_config["instrument_parameters"]["ScattFWHMMin"]
) & (
pdf_pbp["Scatter FWHM"] <= instr_config["instrument_parameters"]["ScattFWHMMax"]
)
flag_scatt = flag_scatt_transit_time & flag_scatt_fwhm
pdf_pbp["flag_valid_scatt_signal"] = flag_scatt
pdf_pbp["flag_scatt_not_sat"] = (
pdf_pbp["Scatter relPeak"] < config["Signal_saturation"]["ScattSatPoint"]
pdf_pbp["Scatter relPeak"] < instr_config["Signal_saturation"]["ScattSatPoint"]
)
flag_scatt_in_range = (
flag_scatt
& (pdf_pbp["Opt diam"] >= config["scatt_histo"]["min_D"])
& (pdf_pbp["Opt diam"] <= config["scatt_histo"]["max_D"])
& (pdf_pbp["Opt diam"] >= run_config["histo"]["scatt"]["min_D"])
& (pdf_pbp["Opt diam"] <= run_config["histo"]["scatt"]["max_D"])
)
pdf_pbp["flag_valid_scatt_signal_in_range"] = flag_scatt_in_range
pdf_pbp["flag_negative_timelag"] = (
pdf_pbp["time_lag"] < config["time_delay_histo"]["min"]
pdf_pbp["time_lag"] < run_config["histo"]["timelag"]["min"]
)
pdf_pbp["flag_extreme_positive_timelag"] = (
pdf_pbp["time_lag"] >= config["time_delay_histo"]["max"]
pdf_pbp["time_lag"] >= run_config["histo"]["timelag"]["max"]
)
pdf_pbp["flag_valid_timelag_thin"] = (
pdf_pbp["time_lag"] < config["thick_coating"]["threshold"]
) & (pdf_pbp["time_lag"] >= config["time_delay_histo"]["min"])
pdf_pbp["time_lag"] < run_config["mixing_state"]["threshold"]
) & (pdf_pbp["time_lag"] >= run_config["histo"]["timelag"]["min"])
pdf_pbp["flag_valid_timelag_thick"] = (
pdf_pbp["time_lag"] >= config["thick_coating"]["threshold"]
) & (pdf_pbp["time_lag"] < config["time_delay_histo"]["max"])
pdf_pbp["time_lag"] >= run_config["mixing_state"]["threshold"]
) & (pdf_pbp["time_lag"] < run_config["histo"]["timelag"]["max"])
pdf_pbp["flag_low_ratio_inc_scatt"] = pdf_pbp["ratio_inc_scatt"] < 1.1

View File

@@ -29,6 +29,37 @@ def load_and_resolve_config(args):
"partition": choose(args.partition, base, "cluster.partition", "hourly"),
"log_dir": get(base, "cluster.log_dir", "./slurm_out"),
},
"calibration": {
"incandescence": {
"curve_type": get(base, "calibration.incadescence.curve_type", None),
"parameters": get(base, "calibration.incadescence.parameters", None),
},
"scattering": {
"curve_type": get(base, "calibration.scattering.curve_type", None),
"parameters": get(base, "calibration.scattering.parameters", None),
},
},
"histo": {
"inc": {
"min_mass": get(base, "histo.inc.min_mass", 0.3),
"max_mass": get(base, "histo.inc.max_mass", 400),
"n_bins": get(base, "histo.inc.n_bins", 50),
},
"scatt": {
"min_D": get(base, "histo.scatt.min_D", 100),
"max_D": get(base, "histo.scatt.max_D", 500),
"n_bins": get(base, "histo.scatt.n_bins", 20),
},
"timelag": {
"min": get(base, "histo.timelag.min", -10),
"max": get(base, "histo.timelag.max", 400),
"n_bins": get(base, "histo.timelag.n_bins", 100),
},
},
"mixing_state": {
"threshold": get(base, "mixing_state.threshold", 50),
"inc_scatt_ratio": get(base, "mixing_state.inc_scatt_ratio", 1.1),
},
"base": base,
}

View File

@@ -13,23 +13,3 @@ instrument_parameters:
Signal_saturation:
IncSatPoint: 1700000000.0
ScattSatPoint: 1700000000.0
inc_calibration:
curve_type: "polynomial"
parameters: [0.05, 2.0470000507725255e-07]
inc_histo:
min_mass: 0.3
max_mass: 400
n_bins: 50
scatt_calibration:
curve_type: "powerlaw"
parameters: [17.21724257, 0.16908516, -1.49431104]
scatt_histo:
min_D: 100
max_D: 500
n_bins: 20
time_delay_histo:
min: -10
max: 400
n_bins: 100
thick_coating:
threshold: 50

View File

@@ -22,14 +22,26 @@ bc:
histo:
inc:
min_mass: 1e-18
max_mass: 1e-12
n_bins: 40
min_mass: 0.3
max_mass: 400
n_bins: 50
scatt:
min_D: 100
max_D: 1000
n_bins: 30
max_D: 500
n_bins: 20
timelag:
min: -50
max: 100
n_bins: 150
min: -10
max: 400
n_bins: 100
mixing_state:
threshold: 50
inc_scatt_ratio: 1.1
calibration:
incandescence:
curve_type: "polynomial"
parameters: [0.05, 2.0470000507725255e-07]
scattering:
curve_type: "powerlaw"
parameters: [17.21724257, 0.16908516, -1.49431104]