Refactor: moved some functions to respective modules and implemented the timelag hists

This commit is contained in:
2025-08-12 15:53:48 +02:00
parent b05853416b
commit fa4cb9e7d4
4 changed files with 237 additions and 670 deletions

View File

@@ -51,8 +51,8 @@ def parse_args():
def apply_calibration(partition, config):
"""Apply calibration to a pandas partition."""
pdf = calibrate_particle_data(partition, config)
pdf["date"] = pdf.index.date.astype("datetime64[ns]")
pdf["hour"] = pdf.index.hour.astype("int64")
# pdf["date"] = pdf.index.date.astype("datetime64[ns]")
# pdf["hour"] = pdf.index.hour.astype("int64")
return pdf
@@ -99,6 +99,8 @@ def main():
# --- Apply calibration in parallel ---
calibrated_df = df.map_partitions(apply_calibration, config=cfg_future, meta=meta)
calibrated_df["date"] = df.index.dt.date.astype("date64[pyarrow]")
calibrated_df["hour"] = df.index.dt.hour.astype("i8")
# --- Save calibrated dataset (partitioned by date/hour) ---
calibrated_df.to_parquet(

View File

@@ -1,18 +1,23 @@
from __future__ import annotations
import argparse
from functools import reduce
from pathlib import Path
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import yaml
import time
import dask.dataframe as dd
from dask.dataframe.utils import make_meta
import pandas as pd
import numpy as np
from typing import Tuple, List
from sp2xr.calibration import calibrate_particle_data, BC_mass_to_diam
from sp2xr.calibration import calibrate_particle_data
from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags
from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition
from sp2xr.distribution import (
bin_lims_to_ctrs,
process_hist_and_dist_partition,
make_hist_meta,
)
from sp2xr.concentrations import add_concentrations
def parse_args():
@@ -52,7 +57,7 @@ def parse_args():
return p.parse_args()
def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily", out_dir=""):
def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""):
cluster = SLURMCluster(
cores=cores,
processes=cores,
@@ -71,595 +76,6 @@ def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily", out_d
return client
def apply_calibration(partition, config):
"""Apply calibration to a pandas partition."""
pdf = calibrate_particle_data(partition, config)
pdf["date"] = partition.index.date.astype("datetime64[ns]")
pdf["hour"] = partition.index.hour.astype("int8")
return pdf
def define_flags(pdf_pbp, config):
flag_inc_transit_time = (
pdf_pbp["Incand Transit Time"]
>= config["instrument_parameters"]["IncTransitMin"]
) & (
pdf_pbp["Incand Transit Time"]
<= config["instrument_parameters"]["IncTransitMax"]
)
flag_inc_fwhm = (
pdf_pbp["Incand FWHM"] >= config["instrument_parameters"]["IncFWHMMin"]
) & (pdf_pbp["Incand FWHM"] <= config["instrument_parameters"]["IncFWHMMax"])
flag_inc = flag_inc_transit_time & flag_inc_fwhm
pdf_pbp["flag_valid_inc_signal"] = flag_inc
pdf_pbp["flag_inc_not_sat"] = (
pdf_pbp["Incand relPeak"] < config["Signal_saturation"]["IncSatPoint"]
)
flag_inc_in_range = (
flag_inc
& (pdf_pbp["BC mass"] >= config["inc_histo"]["min_mass"])
& (pdf_pbp["BC mass"] <= config["inc_histo"]["max_mass"])
)
pdf_pbp["flag_valid_inc_signal_in_range"] = flag_inc_in_range
flag_scatt_transit_time = (
pdf_pbp["Scatter Transit Time"]
>= config["instrument_parameters"]["ScattTransitMin"]
) & (
pdf_pbp["Scatter Transit Time"]
<= config["instrument_parameters"]["ScattTransitMax"]
)
flag_scatt_fwhm = (
pdf_pbp["Scatter FWHM"] >= config["instrument_parameters"]["ScattFWHMMin"]
) & (pdf_pbp["Scatter FWHM"] <= config["instrument_parameters"]["ScattFWHMMax"])
flag_scatt = flag_scatt_transit_time & flag_scatt_fwhm
pdf_pbp["flag_valid_scatt_signal"] = flag_scatt
pdf_pbp["flag_scatt_not_sat"] = (
pdf_pbp["Scatter relPeak"] < config["Signal_saturation"]["ScattSatPoint"]
)
flag_scatt_in_range = (
flag_scatt
& (pdf_pbp["Opt diam"] >= config["scatt_histo"]["min_D"])
& (pdf_pbp["Opt diam"] <= config["scatt_histo"]["max_D"])
)
pdf_pbp["flag_valid_scatt_signal_in_range"] = flag_scatt_in_range
pdf_pbp["flag_negative_timelag"] = (
pdf_pbp["time_lag"] < config["time_delay_histo"]["min"]
)
pdf_pbp["flag_extreme_positive_timelag"] = (
pdf_pbp["time_lag"] >= config["time_delay_histo"]["max"]
)
pdf_pbp["flag_valid_timelag_thin"] = (
pdf_pbp["time_lag"] < config["thick_coating"]["threshold"]
) & (pdf_pbp["time_lag"] >= config["time_delay_histo"]["min"])
pdf_pbp["flag_valid_timelag_thick"] = (
pdf_pbp["time_lag"] >= config["thick_coating"]["threshold"]
) & (pdf_pbp["time_lag"] < config["time_delay_histo"]["max"])
pdf_pbp["flag_low_ratio_inc_scatt"] = pdf_pbp["ratio_inc_scatt"] < 1.1
pdf_pbp["Opt diam scatt only"] = pdf_pbp["Opt diam"].where(
flag_scatt & ~flag_inc, np.nan
)
return pdf_pbp
FLAG_COLS = [
"flag_valid_scatt_signal",
"flag_scatt_not_sat",
"flag_valid_inc_signal_in_range",
"flag_valid_timelag_thin",
"flag_valid_timelag_thick",
"flag_negative_timelag",
"flag_extreme_positive_timelag",
"flag_low_ratio_inc_scatt",
]
def add_thin_thick_flags(pdf: pd.DataFrame) -> pd.DataFrame:
"""Add cnts_* indicator + total columns (vectorised)."""
# Safety check: ensure all flags exist
missing = [c for c in FLAG_COLS if c not in pdf.columns]
if missing:
raise KeyError(f"Missing flag columns: {missing}")
f = pdf # shorthand
# initialise empty int8 columns
zeros = np.zeros(len(pdf), dtype="int8")
newcols = {
"cnts_thin": zeros,
"cnts_thin_noScatt": zeros,
"cnts_thick": zeros,
"cnts_thick_sat": zeros,
"cnts_thin_sat": zeros,
"cnts_ntl_sat": zeros,
"cnts_ntl": zeros,
"cnts_extreme_positive_timelag": zeros,
"cnts_thin_low_inc_scatt_ratio": zeros,
"cnts_particles_for_tl_dist": zeros,
}
pdf = pdf.assign(**newcols)
# ------------------------------------------------------------
# vectorised assignments
# ------------------------------------------------------------
pdf["cnts_thin_noScatt"] = (
(~f.flag_valid_scatt_signal) & f.flag_valid_inc_signal_in_range
).astype("int8")
pdf["cnts_thin"] = (
f.flag_valid_scatt_signal
& f.flag_scatt_not_sat
& f.flag_valid_inc_signal_in_range
& f.flag_valid_timelag_thin
& ~f.flag_low_ratio_inc_scatt
).astype("int8")
pdf["cnts_thin_low_inc_scatt_ratio"] = (
f.flag_valid_scatt_signal
& f.flag_scatt_not_sat
& f.flag_valid_inc_signal_in_range
& f.flag_valid_timelag_thin
& f.flag_low_ratio_inc_scatt
).astype("int8")
pdf["cnts_thick"] = (
f.flag_valid_scatt_signal
& f.flag_scatt_not_sat
& f.flag_valid_inc_signal_in_range
& f.flag_valid_timelag_thick
& ~f.flag_extreme_positive_timelag
).astype("int8")
pdf["cnts_thick_sat"] = (
f.flag_valid_scatt_signal
& (~f.flag_scatt_not_sat)
& f.flag_valid_inc_signal_in_range
& f.flag_valid_timelag_thick
& ~f.flag_extreme_positive_timelag
).astype("int8")
pdf["cnts_thin_sat"] = (
f.flag_valid_scatt_signal
& (~f.flag_scatt_not_sat)
& f.flag_valid_inc_signal_in_range
& f.flag_valid_timelag_thin
).astype("int8")
pdf["cnts_ntl_sat"] = (
f.flag_valid_scatt_signal
& (~f.flag_scatt_not_sat)
& f.flag_valid_inc_signal_in_range
& f.flag_negative_timelag
).astype("int8")
pdf["cnts_ntl"] = (
f.flag_valid_scatt_signal
& f.flag_scatt_not_sat
& f.flag_valid_inc_signal_in_range
& f.flag_negative_timelag
).astype("int8")
pdf["cnts_extreme_positive_timelag"] = (
f.flag_valid_scatt_signal
& f.flag_valid_inc_signal_in_range
& f.flag_extreme_positive_timelag
).astype("int8")
pdf["cnts_particles_for_tl_dist"] = (
f.flag_valid_scatt_signal
& f.flag_scatt_not_sat
& f.flag_valid_inc_signal_in_range
& (
(f.flag_valid_timelag_thin & ~f.flag_low_ratio_inc_scatt)
| (f.flag_valid_timelag_thick & ~f.flag_extreme_positive_timelag)
)
).astype("int8")
# totals
pdf["cnts_thin_total"] = (pdf["cnts_thin"] + pdf["cnts_thin_noScatt"]).astype(
"int16"
)
pdf["cnts_thick_total"] = (
pdf["cnts_thick"]
+ pdf["cnts_thick_sat"]
+ pdf["cnts_ntl_sat"]
+ pdf["cnts_ntl"]
+ pdf["cnts_thin_sat"]
).astype("int16")
pdf["cnts_unclassified"] = (
pdf["cnts_extreme_positive_timelag"] + pdf["cnts_thin_low_inc_scatt_ratio"]
).astype("int16")
return pdf
_SUM_COLS: List[str] = [
"Dropped Records",
"Incand Mass (fg)",
"BC mass",
"BC mass within range",
"cnts_thin",
"cnts_thin_noScatt",
"cnts_thick",
"cnts_thick_sat",
"cnts_thin_sat",
"cnts_ntl_sat",
"cnts_ntl",
"cnts_extreme_positive_timelag",
"cnts_thin_low_inc_scatt_ratio",
"cnts_thin_total",
"cnts_thick_total",
"cnts_unclassified",
]
_COUNT_SRC: List[str] = [
"Incand Mass (fg)",
"BC mass",
"BC mass within range",
"Scatter Size (nm)",
"Opt diam scatt only",
"Opt diam scatt only within range",
"temporary_col",
]
_COUNT_DST: List[str] = [
"BC numb from file",
"BC numb",
"BC numb within range",
"scatter numb from file",
"Scatt numb",
"Scatt numb within range",
"original_idx",
]
def build_dt_summary(pdf: pd.DataFrame, dt_s: int = 1) -> pd.DataFrame:
"""
Resample single-particle data to `dt_s`-second bins and return a summary.
Parameters
----------
pdf : pandas.DataFrame
Partition containing particle-level rows. Must have a DateTimeIndex.
dt_s : int, default 1
Resampling interval in seconds.
Returns
-------
pandas.DataFrame
One row per `dt_s`, with summed counters, counts, and means.
Columns `date` and `hour` are added for downstream partitioning.
"""
dt_str = f"{dt_s}s"
# --- SUM columns --------------------------------------------------
df_sum = pdf[_SUM_COLS].resample(dt_str).sum()
# --- COUNT columns ------------------------------------------------
df_cnt = (
pdf[_COUNT_SRC]
.resample(dt_str)
.count()
.rename(columns=dict(zip(_COUNT_SRC, _COUNT_DST)))
)
# --- MEAN columns -------------------------------------------------
df_mean = (
pdf[["Secs_2GB"]]
.resample(dt_str)
.mean()
.rename(columns={"Secs_2GB": "Secs_2GB_mean"})
)
# --- merge + post-process ----------------------------------------
out = pd.concat([df_sum, df_cnt, df_mean], axis=1)
# drop empty bins (original_idx == 0)
out = out[out["original_idx"] != 0].drop(columns="original_idx")
# add date / hour helper cols
out["date"] = out.index.normalize()
out["hour"] = out.index.hour.astype("int64")
return out
def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame:
"""
Partition-wise resampling of HK data to `dt` bins.
Keeps only non-empty intervals (original_idx != 0).
"""
# Ensure local partition is sorted
pdf = pdf.sort_index()
pdf.index = pdf.index.floor(dt)
# --- resample mean values ---
df_mean = (
pdf[
["Sample Flow Controller Read (sccm)", "Sample Flow Controller Read (vccm)"]
]
.resample(dt)
.mean()
)
# --- resample count for 'original_idx' ---
pdf["temporary_col"] = 1
df_count = pdf[["temporary_col"]].resample(dt).count()
df_count.rename(columns={"temporary_col": "original_idx"}, inplace=True)
# --- merge results ---
out = df_mean.join(df_count)
# --- drop empty bins ---
out = out[out["original_idx"] != 0].drop(columns="original_idx")
out = out.asfreq(dt).ffill(limit=1)
# --- add date/hour columns ---
out["date"] = out.index.normalize()
out["hour"] = out.index.hour.astype("int64")
return out
def add_concentrations(pdf: pd.DataFrame, dt: int = 1) -> pd.DataFrame:
"""
Add BC / scattering mass- and number-concentration columns.
Parameters
----------
pdf : pandas.DataFrame
1-second (or dt-second) summary partition with the columns generated
by your PbP + HK merge.
dt : int, default 1
Length of each time bin in seconds.
Returns
-------
pandas.DataFrame
Same object with 12 new concentration columns.
"""
# avoid repeated division
flow_sccm = pdf["Sample Flow Controller Read (sccm)"]
flow_vccm = pdf["Sample Flow Controller Read (vccm)"]
sec_per_bin = dt / 60 # convert seconds to minutes
# --- BC mass ------------------------------------------------------
pdf["BC_massConc_std"] = pdf["BC mass"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6)
pdf["BC_massConc_vol"] = pdf["BC mass"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6)
# --- BC number ----------------------------------------------------
pdf["BC_numConc_std"] = pdf["BC numb"] / (flow_sccm * sec_per_bin)
pdf["BC_numConc_vol"] = pdf["BC numb"] / (flow_vccm * sec_per_bin)
# --- BC within range ---------------------------------------------
pdf["BC_massConc_within_range_std"] = (
pdf["BC mass within range"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6)
)
pdf["BC_massConc_within_range_vol"] = (
pdf["BC mass within range"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6)
)
pdf["BC_numConc_within_range_std"] = pdf["BC numb within range"] / (
flow_sccm * sec_per_bin
)
pdf["BC_numConc_within_range_vol"] = pdf["BC numb within range"] / (
flow_vccm * sec_per_bin
)
# --- scattering number -------------------------------------------
pdf["S_numConc_std"] = pdf["Scatt numb"] / (flow_sccm * sec_per_bin)
pdf["S_numConc_vol"] = pdf["Scatt numb"] / (flow_vccm * sec_per_bin)
pdf["S_numConc_within_range_std"] = pdf["Scatt numb within range"] / (
flow_sccm * sec_per_bin
)
pdf["S_numConc_within_range_vol"] = pdf["Scatt numb within range"] / (
flow_vccm * sec_per_bin
)
return pdf
def make_bin_arrays(
*,
lims: np.ndarray | None = None,
ctrs: np.ndarray | None = None,
n: int = 50,
log: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
if lims is None and ctrs is None:
lims = (
np.logspace(np.log10(0.3), np.log10(400), n)
if log
else np.linspace(0.3, 400, n)
)
elif lims is None:
lims = np.append(
ctrs[0] - (ctrs[1] - ctrs[0]) / 2,
(ctrs[:-1] + ctrs[1:]) / 2,
)
lims = np.append(lims, ctrs[-1] + (ctrs[-1] - ctrs[-2]) / 2)
ctrs = (lims[:-1] + lims[1:]) / 2
return lims, ctrs
def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800):
"""This is from functions.py from Rob"""
"""
Given a normalized number size distribution [dN/dlogDp, cm^-3] defined over the diameters given by Dp [nm]
return the corresponding the mass size distribution [dM/dlogDp, ug/m3]
Optional density input given in units of kg/m3
"""
# M = N*((np.pi*(Dp*1e-9)**3)/6)*rho*1e18
M = N * np.pi / 6 * rho * (Dp**3) * 1e-12
return M
def bin_ctrs_to_lims(x_ctrs):
"""This is from functions.py from Rob"""
"""
Take an array of bin centers and return an array 1 element larger of bin limits
"""
delta = np.diff(x_ctrs) / 2
x_lims = np.append(x_ctrs[0] - delta[0], x_ctrs[:-1] + delta)
x_lims = np.append(x_lims, x_ctrs[-1] + delta[-1])
return x_lims
def bin_lims_to_ctrs(x_lims):
"""
Take an array of bin limits and return an array 1 element smaller of bin centers
"""
x_ctrs = (x_lims[1:] + x_lims[:-1]) / 2
return x_ctrs
def get_dlogp(Dp):
"""This is from functions.py from Rob"""
"""
Given an array of diameters Dp calculate an array of the log of the difference dlogdp
"""
dlogdp = np.log10(bin_ctrs_to_lims(Dp)[1:]) - np.log10(bin_ctrs_to_lims(Dp)[:-1])
return dlogdp
def counts2numConc(numb, flow, t=1):
"""This is from functions.py from Rob"""
"""
Calculate number concentration from counts (cnts), flow rate in [ccm] (Q) and sampling time in [secs] (t)
Return numConc with units of [cm^-3]
"""
conc = numb.divide(flow * (t / 60), axis=0)
return conc
def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400), 50)):
series = series.dropna()
if series.empty:
counts = np.full(len(bin_lims) - 1, np.nan, dtype=float)
else:
counts, _ = np.histogram(series, bins=bin_lims)
return counts
def process_hist_and_dist_partition(
df,
col,
flag_col,
flag_value,
bin_lims,
bin_ctrs,
dt_str,
calculate_conc=True,
flow=None,
rho_eff=1800,
BC_type="",
t=1,
):
# Filter the dataframe per partition
if flag_col and flag_value is not None:
df_filtered = df.loc[df[flag_col] == flag_value]
else:
df_filtered = df
# Adjust flag_col
if flag_col is not None and "cnts" in flag_col:
flag_col = flag_col[5:]
df_resampled = (
df_filtered[[col]]
.resample(dt_str)
.apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims))
)
flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean()
# df_resampled = df_resampled.to_frame(name="result")
if isinstance(df_resampled, pd.Series):
df_resampled = df_resampled.to_frame(name="result")
else: # already a DataFrame
df_resampled.columns = ["result"]
original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count()
original_idx_df.columns = ["original_idx"] # ensure proper naming
df_resampled = df_resampled.join(original_idx_df, how="left")
if "original_idx" in df_resampled.columns:
df_resampled = df_resampled[df_resampled["original_idx"] != 0]
df_resampled = df_resampled.drop("original_idx", axis=1)
# Initialize DataFrame based on whether the result is empty or not
list_col = "result"
max_list_length = len(bin_ctrs)
if df_resampled.empty:
columns = [f"{list_col}_{i}" for i in range(max_list_length)]
ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index)
else:
ddf_hist = df_resampled[list_col].apply(pd.Series)
ddf_hist.index = df_resampled.index
columns = [f"{list_col}_{i}" for i in range(max_list_length)]
ddf_hist.columns = columns
if calculate_conc:
# Join with the sample flow controller data
inc_hist_flow = ddf_hist.join(flow_dt)
if rho_eff is not None and BC_type is not None:
# Calculate diameters and densities
density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
# Calculate number concentration
dNdlogDmev = counts2numConc(
inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t
) / get_dlogp(Dmev)
if flag_col is None:
dNdlogDmev.columns = [f"dNdlogDmev_all_{i:.2f}" for i in Dmev]
else:
dNdlogDmev.columns = [f"dNdlogDmev_{flag_col}_{i:.2f}" for i in Dmev]
# Calculate mass concentration
dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density)
if flag_col is None:
dMdlogDmev.columns = [f"dMdlogDmev_all_{i:.2f}" for i in Dmev]
else:
dMdlogDmev.columns = [f"dMdlogDmev_{flag_col}_{i:.2f}" for i in Dmev]
else:
# Calculate number concentration
dNdlogDmev = counts2numConc(
inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t
) / get_dlogp(bin_ctrs)
if flag_col is None:
dNdlogDmev.columns = [f"dNdlogDsc_all_{i:.2f}" for i in bin_ctrs]
else:
dNdlogDmev.columns = [f"dNdlogDsc_{flag_col}_{i:.2f}" for i in bin_ctrs]
dMdlogDmev = None
# return dNdlogDmev, dMdlogDmev
# else:
# return ddf_hist, None
if calculate_conc:
# Concatenate results into a single dataframe
if dMdlogDmev is not None:
result_df = pd.concat([dNdlogDmev, dMdlogDmev], axis=1)
else:
result_df = dNdlogDmev
else:
result_df = ddf_hist
return result_df
def main():
args = parse_args()
output_dir = Path(args.output)
@@ -702,7 +118,7 @@ def main():
meta_cal["hour"] = pd.Series([], dtype="int8")
ddf_cal = ddf_raw.map_partitions(
apply_calibration, config=cfg_future, meta=meta_cal
calibrate_particle_data, config=cfg_future, meta=meta_cal
)
ddf_cal["time_lag"] = ddf_cal["Incand Peak Time"] - ddf_cal["Scatter Peak Time"]
ddf_cal["ratio_inc_scatt"] = np.log10(ddf_cal["Incand relPeak"]) / np.log10(
@@ -770,18 +186,18 @@ def main():
tolerance=pd.Timedelta(tol),
)'''
flow_1s = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
def floor_index_to_sec(pdf: pd.DataFrame) -> pd.DataFrame:
def floor_index_to_dt(pdf: pd.DataFrame) -> pd.DataFrame:
"""
Replace the existing DatetimeIndex with its lower-second value,
without changing the indexs name or creating a new column.
"""
pdf.index = pdf.index.floor("s") # keeps the original .name
pdf.index = pdf.index.floor(f"{args.dt}s") # keeps the original .name
return pdf
# meta stays identical because only the *values* of the index change
ddf_cal = ddf_cal.map_partitions(floor_index_to_sec, meta=ddf_cal._meta)
ddf_cal = ddf_cal.map_partitions(floor_index_to_dt, meta=ddf_cal._meta)
meta_pbp_with_flow = ddf_cal._meta.copy()
meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = (
@@ -794,7 +210,7 @@ def main():
meta=meta_pbp_with_flow,
)"""
ddf_pbp_with_flow = ddf_cal.map_partitions(
lambda part: part.join(flow_1s, how="left"),
lambda part: part.join(flow_dt, how="left"),
meta=meta_pbp_with_flow,
)
@@ -858,6 +274,7 @@ def main():
do_hist = args.hist
if do_hist:
# ========= 3. RUN MASS HISTOGRAMS =========
# --- Mass histogram configs
hist_configs = [
{"name": "BC_all", "flag_col": None, "flag_value": None},
@@ -883,57 +300,8 @@ def main():
{"name": "unclassified", "flag_col": "cnts_unclassified", "flag_value": 1},
]
# ========= 2. WRAPPER FUNCTION =========
def make_hist_meta(
*,
bin_ctrs: np.ndarray,
kind: str, # "mass", "scatt", "timelag"
flag_col: str | None, # e.g. "cnts_thin" or None
rho_eff=None,
BC_type=None,
dtype=np.float64,
):
"""
Return an empty DataFrame whose column names AND dtypes are exactly
what `process_hist_and_dist_partition()` will emit, without ever
calling that function.
"""
# ---- pick prefix exactly like the function ----------------------
prefix = (
"all"
if flag_col is None
else flag_col[5:] if flag_col.startswith("cnts_") else flag_col
)
# ---- translate mass bins if needed ------------------------------
if kind == "mass" and rho_eff is not None and BC_type is not None:
# same conversion used inside the partition function
_, labels = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
n_prefix, m_prefix = "dNdlogDmev", "dMdlogDmev"
elif kind == "scatt":
labels = bin_ctrs
n_prefix, m_prefix = "dNdlogDsc", None
elif kind == "timelag":
labels = bin_ctrs
n_prefix, m_prefix = f"{prefix}_timelag", None
else:
raise ValueError(f"Unknown histogram kind '{kind}'")
# ---- build column list ------------------------------------------
cols = [f"{n_prefix}_{prefix}_{v:.2f}" for v in labels]
if kind == "mass" and m_prefix:
cols += [f"{m_prefix}_{prefix}_{v:.2f}" for v in labels]
empty_df = pd.DataFrame(
{c: pd.Series(dtype=dtype) for c in cols},
index=pd.DatetimeIndex([], name="calculated_time"),
)
return make_meta(empty_df)
# ========= 3. RUN MASS HISTOGRAMS =========
results = []
for cfg_hist in hist_configs:
meta_hist = make_hist_meta(
bin_ctrs=inc_mass_bin_ctrs,
@@ -949,7 +317,7 @@ def main():
flag_value=cfg_hist["flag_value"],
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt_str="1s",
dt=dt_s,
calculate_conc=True,
flow=None,
rho_eff=rho_eff,
@@ -982,7 +350,7 @@ def main():
flag_value=None,
bin_lims=scatt_bin_lims,
bin_ctrs=scatt_bin_ctrs,
dt_str="1s",
dt=dt_s,
calculate_conc=True,
flow=None,
rho_eff=None,
@@ -991,8 +359,9 @@ def main():
meta=meta_hist, # <-- single line does the trick
)
results.append(ddf_scatt)
# ========= 5. RUN TIMELAG HISTOGRAMS =========
"""
# --- Timelag histogram parameters
timelag_bins_lims = np.linspace(
cfg["time_delay_histo"]["min"],
@@ -1002,41 +371,58 @@ def main():
timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims)
mass_bins = ddf_pbp_with_flow["BC mass bin"].unique().compute()
for idx, mass_bin in enumerate(mass_bins):
for idx, mass_bin in enumerate(mass_bins[:1]):
ddf_bin = ddf_pbp_with_flow[ddf_pbp_with_flow["BC mass bin"] == mass_bin]
# meta_cols = [f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag_{i:.1f}" for i in timelag_bin_ctrs]
bin_center = (
name_prefix = f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag"
# meta_cols = [f"{name_prefix}_{i:.2f}" for i in timelag_bin_ctrs]
# meta_float = pd.DataFrame({c: pd.Series([], dtype="float64") for c in meta_cols})
meta_hist = make_hist_meta(
bin_ctrs=timelag_bin_ctrs,
kind="timelag",
flag_col="cnts_particles_for_tl_dist",
name_prefix=name_prefix,
rho_eff=None,
BC_type=None,
)
# meta_cols = [f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs]
"""bin_center = (
inc_mass_bin_ctrs[int(mass_bin)]
if mass_bin < len(inc_mass_bin_ctrs)
else mass_bin
)
meta_cols = [
f"dNdlogDmev_{bin_center:.2f}_timelag_{i:.1f}" for i in timelag_bin_ctrs
]
f"dNdlogDmev_{bin_center:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs
]"""
tl_ddf = ddf_bin.map_partitions(
process_hist_and_dist_partition,
col="time_lag_new",
col="time_lag",
flag_col="cnts_particles_for_tl_dist",
flag_value=1,
bin_lims=timelag_bins_lims,
bin_ctrs=timelag_bin_ctrs,
dt_str="1s",
dt=dt_s,
calculate_conc=True,
# flow=ddf_pbp_hk["Sample Flow Controller Read (vccm)"],
flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"],
rho_eff=None,
BC_type=None,
t=1,
meta=pd.DataFrame(columns=meta_cols),
name_prefix=name_prefix, # <-- force naming
meta=meta_hist, # pd.DataFrame(columns=meta_cols)
)
tl_ddf.columns = meta_cols
# tl_ddf.columns = meta_cols
results.append(tl_ddf)
"""
# ========= 6. MERGE ALL HISTOGRAMS =========
merged_ddf = reduce(lambda left, right: left.join(right), results)
# merged_ddf = reduce(lambda left, right: left.join(right), results)
merged_ddf = dd.concat(results, axis=1, interleave_partitions=True)
# merged_ddf = merged_ddf.rename_axis(index="time")
# Add normalized date column
# merged_ddf = merged_ddf.assign(date=merged_ddf.index.to_series().dt.normalize())

View File

@@ -2,6 +2,14 @@ import numpy as np
from numpy.polynomial import Polynomial
def apply_calibration(partition, config):
"""Apply calibration to a pandas partition."""
pdf = calibrate_particle_data(partition, config)
# pdf["date"] = partition.index.date.astype("datetime64[ns]")
# pdf["hour"] = partition.index.hour.astype("int8")
return pdf
def polynomial(x, *coefficients):
"""
Function to generate a generic polynomial curve. The number of input coefficicnets define the degree of the polynomial curve
@@ -126,7 +134,8 @@ def calibrate_particle_data(df, config):
curve_type=scatt_conf.get("curve_type"),
params=scatt_conf.get("parameters", []),
)
df["date"] = df.index.date.astype("datetime64[ns]")
df["hour"] = df.index.hour.astype("int8")
return df

View File

@@ -1,6 +1,9 @@
from __future__ import annotations
import numpy as np
import pandas as pd
from typing import Tuple
from typing import Optional
from dask.dataframe.utils import make_meta
from .calibration import BC_mass_to_diam
@@ -87,20 +90,21 @@ def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400
return counts
def process_hist_and_dist_partition(
"""def process_hist_and_dist_partition(
df,
col,
flag_col,
flag_value,
bin_lims,
bin_ctrs,
dt_str,
dt,
calculate_conc=True,
flow=None,
rho_eff=1800,
BC_type="",
t=1,
):
dt_str = f"{dt}s"
# Filter the dataframe per partition
if flag_col and flag_value is not None:
df_filtered = df.loc[df[flag_col] == flag_value]
@@ -195,3 +199,169 @@ def process_hist_and_dist_partition(
else:
result_df = ddf_hist
return result_df
"""
def process_hist_and_dist_partition(
df: pd.DataFrame,
col: str,
flag_col: Optional[str],
flag_value: Optional[int],
bin_lims: np.ndarray,
bin_ctrs: np.ndarray,
dt,
calculate_conc: bool = True,
flow=None, # kept for API compatibility; not used
rho_eff: Optional[float] = 1800,
BC_type: Optional[str] = "",
t: float = 1.0,
name_prefix: Optional[str] = None, # <-- NEW: force exact output name prefix
):
# normalize dt -> "Xs"
dt_str = f"{dt}s" if isinstance(dt, (int, float)) else str(dt)
# filter by flag (when provided)
if flag_col and flag_value is not None:
df_filtered = df.loc[df[flag_col] == flag_value]
else:
df_filtered = df
# adjust flag_col "cnts_*" -> "*"
if flag_col is not None and flag_col.startswith("cnts_"):
flag_core = flag_col[5:]
else:
flag_core = flag_col
# histogram of 'col' per time bin
df_resampled = (
df_filtered[[col]]
.resample(dt_str)
.apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims))
)
# flow per time bin (from this partitions index)
flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean()
# ensure df_resampled is a single-column DataFrame named "result"
if isinstance(df_resampled, pd.Series):
df_resampled = df_resampled.to_frame(name="result")
else:
df_resampled.columns = ["result"]
# drop time bins with zero original rows (matches your prior logic)
original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count()
original_idx_df.columns = ["original_idx"]
df_resampled = df_resampled.join(original_idx_df, how="left")
if "original_idx" in df_resampled.columns:
df_resampled = df_resampled[df_resampled["original_idx"] != 0]
df_resampled = df_resampled.drop(columns=["original_idx"])
# expand list-of-counts -> wide matrix (one col per bin)
max_list_length = len(bin_ctrs)
if df_resampled.empty:
# IMPORTANT: use the resampled time index so schema matches later joins
idx = flow_dt.index
columns = [f"result_{i}" for i in range(max_list_length)]
ddf_hist = pd.DataFrame(np.nan, columns=columns, index=idx)
else:
ddf_hist = df_resampled["result"].apply(pd.Series)
ddf_hist.index = df_resampled.index
ddf_hist.columns = [f"result_{i}" for i in range(max_list_length)]
if not calculate_conc:
return ddf_hist
# join flow
inc_hist_flow = ddf_hist.join(flow_dt)
# compute number concentration (per-log-bin)
# last column is flow; left part are counts per bin
dNd = counts2numConc(inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t)
# choose the abscissa used for dlog (Dmev for BC, or bin_ctrs otherwise)
if rho_eff is not None and BC_type is not None:
density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
dNdlog = dNd / get_dlogp(Dmev)
# names
if name_prefix is not None:
dNdlog.columns = [f"{name_prefix}_{b:.2f}" for b in Dmev]
else:
base = "dNdlogDmev"
tag = "all" if flag_core is None else flag_core
dNdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in Dmev]
# mass concentration
dMdlog = dNdlogDp_to_dMdlogDp(dNdlog, Dmev, rho=density)
if name_prefix is not None:
# mirror the naming with a mass prefix
mass_prefix = name_prefix.replace("dNdlog", "dMdlog")
dMdlog.columns = [f"{mass_prefix}_{b:.2f}" for b in Dmev]
else:
base = "dMdlogDmev"
tag = "all" if flag_core is None else flag_core
dMdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in Dmev]
return pd.concat([dNdlog, dMdlog], axis=1)
else:
# e.g., timelag/scattering case (no BC diameter conversion)
dNdlog = dNd / get_dlogp(bin_ctrs)
if name_prefix is not None:
dNdlog.columns = [f"{name_prefix}_{b:.2f}" for b in bin_ctrs]
else:
base = "dNdlogDsc"
tag = "all" if flag_core is None else flag_core
dNdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in bin_ctrs]
return dNdlog
def make_hist_meta(
*,
bin_ctrs: np.ndarray,
kind: str, # "mass", "scatt", "timelag"
flag_col: str | None, # e.g. "cnts_thin" or None
name_prefix=None,
rho_eff=None,
BC_type=None,
dtype=np.float64,
):
"""
Return an empty DataFrame whose column names AND dtypes are exactly
what `process_hist_and_dist_partition()` will emit, without ever
calling that function.
"""
# ---- pick prefix exactly like the function ----------------------
prefix = (
"all_"
if flag_col is None
else f"{flag_col[5:]}_" if flag_col.startswith("cnts_") else f"{flag_col}_"
)
# ---- translate mass bins if needed ------------------------------
if kind == "mass" and rho_eff is not None and BC_type is not None:
# same conversion used inside the partition function
_, labels = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
n_prefix, m_prefix = "dNdlogDmev", "dMdlogDmev"
elif kind == "scatt":
labels = bin_ctrs
n_prefix, m_prefix = "dNdlogDsc", None
elif kind == "timelag":
labels = bin_ctrs
if name_prefix is None:
n_prefix, m_prefix = f"{prefix}", None
else:
prefix = ""
n_prefix, m_prefix = f"{name_prefix}", None
else:
raise ValueError(f"Unknown histogram kind '{kind}'")
# ---- build column list ------------------------------------------
cols = [f"{n_prefix}_{prefix}{v:.2f}" for v in labels]
if kind == "mass" and m_prefix:
cols += [f"{m_prefix}_{prefix}{v:.2f}" for v in labels]
empty_df = pd.DataFrame(
{c: pd.Series(dtype=dtype) for c in cols},
index=pd.DatetimeIndex([], name="calculated_time"),
)
return make_meta(empty_df)