Files
SP2XR/src/sp2xr/distribution.py

458 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import numpy as np
import pandas as pd
import gc
import dask.dataframe as dd
from dask.dataframe.utils import make_meta
from .calibration import BC_mass_to_diam
from sp2xr.helpers import delete_partition_if_exists
from sp2xr.schema import cast_and_arrow, DEFAULT_FLOAT
def make_bin_arrays(
*,
lims: np.ndarray | None = None,
ctrs: np.ndarray | None = None,
n: int = 50,
log: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
if lims is None and ctrs is None:
lims = (
np.logspace(np.log10(0.3), np.log10(400), n)
if log
else np.linspace(0.3, 400, n)
)
elif lims is None:
lims = np.append(
ctrs[0] - (ctrs[1] - ctrs[0]) / 2,
(ctrs[:-1] + ctrs[1:]) / 2,
)
lims = np.append(lims, ctrs[-1] + (ctrs[-1] - ctrs[-2]) / 2)
ctrs = (lims[:-1] + lims[1:]) / 2
return lims, ctrs
def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800):
"""This is from functions.py from Rob"""
"""
Given a normalized number size distribution [dN/dlogDp, cm^-3] defined over the diameters given by Dp [nm]
return the corresponding the mass size distribution [dM/dlogDp, ug/m3]
Optional density input given in units of kg/m3
"""
# M = N*((np.pi*(Dp*1e-9)**3)/6)*rho*1e18
M = N * np.pi / 6 * rho * (Dp**3) * 1e-12
return M
def bin_ctrs_to_lims(x_ctrs):
"""This is from functions.py from Rob"""
"""
Take an array of bin centers and return an array 1 element larger of bin limits
"""
delta = np.diff(x_ctrs) / 2
x_lims = np.append(x_ctrs[0] - delta[0], x_ctrs[:-1] + delta)
x_lims = np.append(x_lims, x_ctrs[-1] + delta[-1])
return x_lims
def bin_lims_to_ctrs(x_lims):
"""
Take an array of bin limits and return an array 1 element smaller of bin centers
"""
x_ctrs = (x_lims[1:] + x_lims[:-1]) / 2
return x_ctrs
def get_dlogp(Dp):
"""This is from functions.py from Rob"""
"""
Given an array of diameters Dp calculate an array of the log of the difference dlogdp
"""
dlogdp = np.log10(bin_ctrs_to_lims(Dp)[1:]) - np.log10(bin_ctrs_to_lims(Dp)[:-1])
return dlogdp
def counts2numConc(numb, flow, t=1):
"""This is from functions.py from Rob"""
"""
Calculate number concentration from counts (cnts), flow rate in [ccm] (Q) and sampling time in [secs] (t)
Return numConc with units of [cm^-3]
"""
conc = numb.divide(flow * (t / 60), axis=0)
return conc
def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400), 50)):
series = series.dropna()
if series.empty:
counts = np.full(len(bin_lims) - 1, np.nan, dtype=float)
else:
counts, _ = np.histogram(series, bins=bin_lims)
return counts
def process_hist_and_dist_partition(
df: pd.DataFrame,
col: str,
flag_col: str | None,
flag_value: int | None,
bin_lims: np.ndarray,
bin_ctrs: np.ndarray,
dt,
calculate_conc: bool = True,
flow=None, # kept for API compatibility; not used
rho_eff: float = 1800,
BC_type: str = "",
# t: float = 1.0,
name_prefix: str | None = None, # <-- NEW: force exact output name prefix
):
# normalize dt -> "Xs"
dt_str = f"{dt}s" if isinstance(dt, (int, float)) else str(dt)
t_sec: float = pd.to_timedelta(dt_str).total_seconds()
# filter by flag (when provided)
if flag_col and flag_value is not None:
df_filtered = df.loc[df[flag_col] == flag_value]
else:
df_filtered = df
# adjust flag_col "cnts_*" -> "*"
if flag_col is not None and flag_col.startswith("cnts_"):
flag_core = flag_col[5:]
else:
flag_core = flag_col
# histogram of 'col' per time bin
df_resampled = (
df_filtered[[col]]
.resample(dt_str)
.apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims))
)
# flow per time bin (from this partitions index)
flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean()
# ensure df_resampled is a single-column DataFrame named "result"
if isinstance(df_resampled, pd.Series):
df_resampled = df_resampled.to_frame(name="result")
else:
df_resampled.columns = ["result"]
# drop time bins with zero original rows (matches your prior logic)
original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count()
original_idx_df.columns = ["original_idx"]
df_resampled = df_resampled.join(original_idx_df, how="left")
if "original_idx" in df_resampled.columns:
df_resampled = df_resampled[df_resampled["original_idx"] != 0]
df_resampled = df_resampled.drop(columns=["original_idx"])
# expand list-of-counts -> wide matrix (one col per bin)
max_list_length = len(bin_ctrs)
if df_resampled.empty:
ddf_hist = pd.DataFrame(
columns=[f"result_{i}" for i in range(max_list_length)],
index=pd.DatetimeIndex([], name=df.index.name),
)
else:
ddf_hist = df_resampled["result"].apply(pd.Series)
ddf_hist.index = df_resampled.index
ddf_hist.columns = [f"result_{i}" for i in range(max_list_length)]
if not calculate_conc:
return ddf_hist
# join flow
inc_hist_flow = ddf_hist.join(flow_dt)
# compute number concentration (per-log-bin)
# last column is flow; left part are counts per bin
dNd = counts2numConc(inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t_sec)
# choose the abscissa used for dlog (Dmev for BC, or bin_ctrs otherwise)
if rho_eff is not None and BC_type is not None:
density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
dNdlog = dNd / get_dlogp(Dmev)
# names
if name_prefix is not None:
dNdlog.columns = [f"{name_prefix}_{b:.2f}" for b in Dmev]
else:
base = "dNdlogDmev"
tag = "all" if flag_core is None else flag_core
dNdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in Dmev]
# mass concentration
dMdlog = dNdlogDp_to_dMdlogDp(dNdlog, Dmev, rho=density)
if name_prefix is not None:
# mirror the naming with a mass prefix
mass_prefix = name_prefix.replace("dNdlog", "dMdlog")
dMdlog.columns = [f"{mass_prefix}_{b:.2f}" for b in Dmev]
else:
base = "dMdlogDmev"
tag = "all" if flag_core is None else flag_core
dMdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in Dmev]
return pd.concat([dNdlog, dMdlog], axis=1)
else:
# e.g., timelag/scattering case (no BC diameter conversion)
dNdlog = dNd / get_dlogp(bin_ctrs)
if name_prefix is not None:
dNdlog.columns = [f"{name_prefix}_{b:.2f}" for b in bin_ctrs]
else:
base = "dNdlogDsc"
tag = "all" if flag_core is None else flag_core
dNdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in bin_ctrs]
return dNdlog
def make_hist_meta(
*,
bin_ctrs: np.ndarray,
kind: str, # "mass", "scatt", "timelag"
flag_col: str | None, # e.g. "cnts_thin" or None
name_prefix=None,
rho_eff=None,
BC_type=None,
dtype=np.float64,
):
"""
Return an empty DataFrame whose column names AND dtypes are exactly
what `process_hist_and_dist_partition()` will emit, without ever
calling that function.
"""
# ---- pick prefix exactly like the function ----------------------
prefix = (
"all_"
if flag_col is None
else f"{flag_col[5:]}_" if flag_col.startswith("cnts_") else f"{flag_col}_"
)
# ---- translate mass bins if needed ------------------------------
if kind == "mass" and rho_eff is not None and BC_type is not None:
# same conversion used inside the partition function
_, labels = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
n_prefix, m_prefix = "dNdlogDmev", "dMdlogDmev"
elif kind == "scatt":
labels = bin_ctrs
n_prefix, m_prefix = "dNdlogDsc", None
elif kind == "timelag":
labels = bin_ctrs
if name_prefix is None:
n_prefix, m_prefix = f"{prefix}", None
else:
prefix = ""
n_prefix, m_prefix = f"{name_prefix}", None
else:
raise ValueError(f"Unknown histogram kind '{kind}'")
# ---- build column list ------------------------------------------
cols = [f"{n_prefix}_{prefix}{v:.2f}" for v in labels]
if kind == "mass" and m_prefix:
cols += [f"{m_prefix}_{prefix}{v:.2f}" for v in labels]
empty_df = pd.DataFrame(
{c: pd.Series(dtype=dtype) for c in cols},
index=pd.DatetimeIndex([], name="calculated_time"),
)
return make_meta(empty_df)
def process_histograms(
ddf_pbp_with_flow,
run_config,
inc_mass_bin_lims,
inc_mass_bin_ctrs,
scatt_bin_lims,
scatt_bin_ctrs,
timelag_bins_lims,
timelag_bin_ctrs,
chunk_start,
client,
):
"""Separate function to process histograms and avoid graph buildup"""
computed_results = []
if run_config["do_BC_hist"]:
print("Computing BC distributions...")
# --- Mass histogram
BC_hist_configs = [
{"flag_col": None, "flag_value": None},
{"flag_col": "cnts_thin", "flag_value": 1},
{"flag_col": "cnts_thin_noScatt", "flag_value": 1},
{"flag_col": "cnts_thick", "flag_value": 1},
{"flag_col": "cnts_thick_sat", "flag_value": 1},
{"flag_col": "cnts_thin_sat", "flag_value": 1},
{"flag_col": "cnts_ntl_sat", "flag_value": 1},
{"flag_col": "cnts_ntl", "flag_value": 1},
{
"flag_col": "cnts_extreme_positive_timelag",
"flag_value": 1,
},
{
"flag_col": "cnts_thin_low_inc_scatt_ratio",
"flag_value": 1,
},
{"flag_col": "cnts_thin_total", "flag_value": 1},
{"flag_col": "cnts_thick_total", "flag_value": 1},
{"flag_col": "cnts_unclassified", "flag_value": 1},
]
for cfg_hist in BC_hist_configs:
meta_hist = (
make_hist_meta(
bin_ctrs=inc_mass_bin_ctrs,
kind="mass",
flag_col=cfg_hist["flag_col"],
rho_eff=run_config["rho_eff"],
BC_type=run_config["BC_type"],
)
.astype(DEFAULT_FLOAT, copy=False)
.convert_dtypes(dtype_backend="pyarrow")
)
ddf_out = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="BC mass within range",
flag_col=cfg_hist["flag_col"],
flag_value=cfg_hist["flag_value"],
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=run_config["rho_eff"],
BC_type=run_config["BC_type"],
meta=meta_hist,
).map_partitions(cast_and_arrow, meta=meta_hist)
computed_hist = ddf_out.compute()
computed_results.append(computed_hist)
del ddf_out
# Process other histogram types
if run_config["do_scatt_hist"]:
print("Computing scattering distribution...")
meta_hist = (
make_hist_meta(
bin_ctrs=scatt_bin_ctrs,
kind="scatt",
flag_col=None,
rho_eff=None,
BC_type=None,
)
.astype(DEFAULT_FLOAT, copy=False)
.convert_dtypes(dtype_backend="pyarrow")
)
ddf_scatt = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="Opt diam scatt only",
flag_col=None,
flag_value=None,
bin_lims=scatt_bin_lims,
bin_ctrs=scatt_bin_ctrs,
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=None,
BC_type=None,
meta=meta_hist,
).map_partitions(cast_and_arrow, meta=meta_hist)
computed_scatt = ddf_scatt.compute()
computed_results.append(computed_scatt)
if run_config["do_timelag_hist"]:
print("Computing time delay distribution...")
mass_bins = (
ddf_pbp_with_flow[["BC mass bin"]]
.compute()
.astype("Int64")
.drop_duplicates()
.dropna()
)
for idx, mass_bin in enumerate(mass_bins):
ddf_bin = ddf_pbp_with_flow[ddf_pbp_with_flow["BC mass bin"] == mass_bin]
name_prefix = f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag"
meta_hist = make_hist_meta(
bin_ctrs=timelag_bin_ctrs,
kind="timelag",
flag_col="cnts_particles_for_tl_dist",
name_prefix=name_prefix,
rho_eff=None,
BC_type=None,
)
tl_ddf = ddf_bin.map_partitions(
process_hist_and_dist_partition,
col="time_lag",
flag_col="cnts_particles_for_tl_dist",
flag_value=1,
bin_lims=timelag_bins_lims,
bin_ctrs=timelag_bin_ctrs,
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=None,
BC_type=None,
name_prefix=name_prefix,
meta=meta_hist,
)
tl_ddf = tl_ddf.map_partitions(cast_and_arrow, meta=meta_hist)
computed_tl = tl_ddf.compute()
computed_results.append(computed_tl)
if computed_results:
for i, df in enumerate(computed_results):
# Ensure index is nanosecond precision
if hasattr(df.index, "dtype") and "datetime" in str(df.index.dtype):
df.index = df.index.astype("datetime64[ns]")
# Fix any datetime columns
for col in df.columns:
if hasattr(df[col], "dtype") and "datetime" in str(df[col].dtype):
computed_results[i][col] = df[col].astype("datetime64[ns]")
merged_df = pd.concat(computed_results, axis=1)
# Double-check the merged result
if hasattr(merged_df.index, "dtype") and "datetime" in str(
merged_df.index.dtype
):
merged_df.index = merged_df.index.astype("datetime64[ns]")
merged_ddf = dd.from_pandas(merged_df, npartitions=1)
# merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize()
# merged_ddf["hour"] = merged_ddf["hour"].astype("int64")
time_index = dd.to_datetime(merged_ddf.index.to_series())
# Use string format to avoid Windows path issues with datetime partitions
merged_ddf["date"] = time_index.dt.strftime("%Y-%m-%d")
merged_ddf["hour"] = time_index.dt.hour.astype("int64")
# --- Save hists to parquet
delete_partition_if_exists(
output_path=f"{run_config['output']}/hists_{run_config['dt']}s",
partition_values={
"date": chunk_start.strftime("%Y-%m-%d"),
"hour": chunk_start.hour,
},
)
# Compute immediately
hist_future = merged_ddf.to_parquet(
f"{run_config['output']}/hists_{run_config['dt']}s",
partition_on=run_config["saving_schema"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
append=True,
schema="infer",
compute=False,
)
hist_future.compute()
del merged_df, merged_ddf, computed_results
gc.collect()