Chore: cleanup old code

This commit is contained in:
2025-09-09 15:03:44 +02:00
parent 3a41fbf387
commit b377c36c28
3 changed files with 1 additions and 327 deletions

View File

@@ -304,180 +304,6 @@ def main():
client,
)
"""if run_config["do_BC_hist"]:
print("Computing BC distributions...")
# --- Mass histogram
BC_hist_configs = [
{"flag_col": None, "flag_value": None},
{"flag_col": "cnts_thin", "flag_value": 1},
{"flag_col": "cnts_thin_noScatt", "flag_value": 1},
{"flag_col": "cnts_thick", "flag_value": 1},
{"flag_col": "cnts_thick_sat", "flag_value": 1},
{"flag_col": "cnts_thin_sat", "flag_value": 1},
{"flag_col": "cnts_ntl_sat", "flag_value": 1},
{"flag_col": "cnts_ntl", "flag_value": 1},
{
"flag_col": "cnts_extreme_positive_timelag",
"flag_value": 1,
},
{
"flag_col": "cnts_thin_low_inc_scatt_ratio",
"flag_value": 1,
},
{"flag_col": "cnts_thin_total", "flag_value": 1},
{"flag_col": "cnts_thick_total", "flag_value": 1},
{"flag_col": "cnts_unclassified", "flag_value": 1},
]
results = []
for cfg_hist in BC_hist_configs[:2]:
meta_hist = (
make_hist_meta(
bin_ctrs=inc_mass_bin_ctrs,
kind="mass",
flag_col=cfg_hist["flag_col"],
rho_eff=run_config["rho_eff"],
BC_type=run_config["BC_type"],
)
.astype(DEFAULT_FLOAT, copy=False)
.convert_dtypes(dtype_backend="pyarrow")
)
ddf_out = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="BC mass within range",
flag_col=cfg_hist["flag_col"],
flag_value=cfg_hist["flag_value"],
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=run_config["rho_eff"],
BC_type=run_config["BC_type"],
#t=1,
meta=meta_hist,
).map_partitions(cast_and_arrow, meta=meta_hist)
results.append(ddf_out)
# --- Scattering histogram
if run_config["do_scatt_hist"]:
print("Computing scattering distribution...")
meta_hist = (
make_hist_meta(
bin_ctrs=scatt_bin_ctrs,
kind="scatt",
flag_col=None,
rho_eff=None,
BC_type=None,
)
.astype(DEFAULT_FLOAT, copy=False)
.convert_dtypes(dtype_backend="pyarrow")
)
ddf_scatt = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="Opt diam scatt only",
flag_col=None,
flag_value=None,
bin_lims=scatt_bin_lims,
bin_ctrs=scatt_bin_ctrs,
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=None,
BC_type=None,
#t=1,
meta=meta_hist,
).map_partitions(cast_and_arrow, meta=meta_hist)
results.append(ddf_scatt)
# --- Timelag histogram
if run_config["do_timelag_hist"]:
print("Computing time delay distribution...")
mass_bins = (
ddf_pbp_with_flow[["BC mass bin"]]
.compute()
.astype("Int64")
.drop_duplicates()
.dropna()
)
for idx, mass_bin in enumerate(mass_bins[:1]):
ddf_bin = ddf_pbp_with_flow[
ddf_pbp_with_flow["BC mass bin"] == mass_bin
]
name_prefix = f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag"
meta_hist = make_hist_meta(
bin_ctrs=timelag_bin_ctrs,
kind="timelag",
flag_col="cnts_particles_for_tl_dist",
name_prefix=name_prefix,
rho_eff=None,
BC_type=None,
)
tl_ddf = ddf_bin.map_partitions(
process_hist_and_dist_partition,
col="time_lag",
flag_col="cnts_particles_for_tl_dist",
flag_value=1,
bin_lims=timelag_bins_lims,
bin_ctrs=timelag_bin_ctrs,
dt=run_config["dt"],
calculate_conc=True,
flow=None,
rho_eff=None,
BC_type=None,
#t=1,
name_prefix=name_prefix,
meta=meta_hist,
)
#
tl_ddf = tl_ddf.map_partitions(cast_and_arrow, meta=meta_hist)
results.append(tl_ddf)
# --- Merge all hists
merged_ddf = dd.concat(results, axis=1, interleave_partitions=True)
idx_target = "datetime64[ns]"
merged_ddf = merged_ddf.map_partitions(
lambda pdf: pdf.set_index(pdf.index.astype(idx_target, copy=False)),
meta=merged_ddf._meta,
)
index_as_dt = dd.to_datetime(merged_ddf.index.to_series())
merged_ddf["date"] = index_as_dt.map_partitions(
lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]")
)
# --- Save hists to parquet
delete_partition_if_exists(
output_path=f"{run_config['output']}/hists_{run_config['dt']}s",
partition_values={
"date": chunk_start.strftime("%Y-%m-%d"),
"hour": chunk_start.hour,
},
)
merged_ddf.to_parquet(
f"{run_config['output']}/hists_{run_config['dt']}s",
partition_on=["date"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
append=True,
schema="infer",
)
client.cancel([ddf_pbp_with_flow, ddf_hk,
ddf_hk_dt, ddf_pbp_dt, ddf_pbp_hk_dt])
del ddf_pbp_with_flow
client.run(gc.collect) # workers
gc.collect() # client"""
finally:
# Comprehensive cleanup
try:

View File

@@ -94,118 +94,6 @@ def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400
return counts
"""def process_hist_and_dist_partition(
df,
col,
flag_col,
flag_value,
bin_lims,
bin_ctrs,
dt,
calculate_conc=True,
flow=None,
rho_eff=1800,
BC_type="",
t=1,
):
dt_str = f"{dt}s"
# Filter the dataframe per partition
if flag_col and flag_value is not None:
df_filtered = df.loc[df[flag_col] == flag_value]
else:
df_filtered = df
# Adjust flag_col
if flag_col is not None and "cnts" in flag_col:
flag_col = flag_col[5:]
df_resampled = (
df_filtered[[col]]
.resample(dt_str)
.apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims))
)
flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean()
# df_resampled = df_resampled.to_frame(name="result")
if isinstance(df_resampled, pd.Series):
df_resampled = df_resampled.to_frame(name="result")
else: # already a DataFrame
df_resampled.columns = ["result"]
original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count()
original_idx_df.columns = ["original_idx"] # ensure proper naming
df_resampled = df_resampled.join(original_idx_df, how="left")
if "original_idx" in df_resampled.columns:
df_resampled = df_resampled[df_resampled["original_idx"] != 0]
df_resampled = df_resampled.drop("original_idx", axis=1)
# Initialize DataFrame based on whether the result is empty or not
list_col = "result"
max_list_length = len(bin_ctrs)
if df_resampled.empty:
columns = [f"{list_col}_{i}" for i in range(max_list_length)]
ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index)
else:
ddf_hist = df_resampled[list_col].apply(pd.Series)
ddf_hist.index = df_resampled.index
columns = [f"{list_col}_{i}" for i in range(max_list_length)]
ddf_hist.columns = columns
if calculate_conc:
# Join with the sample flow controller data
inc_hist_flow = ddf_hist.join(flow_dt)
if rho_eff is not None and BC_type is not None:
# Calculate diameters and densities
density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
# Calculate number concentration
dNdlogDmev = counts2numConc(
inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t
) / get_dlogp(Dmev)
if flag_col is None:
dNdlogDmev.columns = [f"dNdlogDmev_all_{i:.2f}" for i in Dmev]
else:
dNdlogDmev.columns = [f"dNdlogDmev_{flag_col}_{i:.2f}" for i in Dmev]
# Calculate mass concentration
dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density)
if flag_col is None:
dMdlogDmev.columns = [f"dMdlogDmev_all_{i:.2f}" for i in Dmev]
else:
dMdlogDmev.columns = [f"dMdlogDmev_{flag_col}_{i:.2f}" for i in Dmev]
else:
# Calculate number concentration
dNdlogDmev = counts2numConc(
inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t
) / get_dlogp(bin_ctrs)
if flag_col is None:
dNdlogDmev.columns = [f"dNdlogDsc_all_{i:.2f}" for i in bin_ctrs]
else:
dNdlogDmev.columns = [f"dNdlogDsc_{flag_col}_{i:.2f}" for i in bin_ctrs]
dMdlogDmev = None
# return dNdlogDmev, dMdlogDmev
# else:
# return ddf_hist, None
if calculate_conc:
# Concatenate results into a single dataframe
if dMdlogDmev is not None:
result_df = pd.concat([dNdlogDmev, dMdlogDmev], axis=1)
else:
result_df = dNdlogDmev
else:
result_df = ddf_hist
return result_df
"""
def process_hist_and_dist_partition(
df: pd.DataFrame,
col: str,
@@ -264,11 +152,6 @@ def process_hist_and_dist_partition(
# expand list-of-counts -> wide matrix (one col per bin)
max_list_length = len(bin_ctrs)
if df_resampled.empty:
"""# IMPORTANT: use the resampled time index so schema matches later joins
idx = flow_dt.index
columns = [f"result_{i}" for i in range(max_list_length)]
ddf_hist = pd.DataFrame(np.nan, columns=columns, index=idx)"""
ddf_hist = pd.DataFrame(
columns=[f"result_{i}" for i in range(max_list_length)],
index=pd.DatetimeIndex([], name=df.index.name),
@@ -391,7 +274,6 @@ def process_histograms(
):
"""Separate function to process histograms and avoid graph buildup"""
# results = []
computed_results = []
if run_config["do_BC_hist"]:
@@ -445,13 +327,12 @@ def process_histograms(
BC_type=run_config["BC_type"],
meta=meta_hist,
).map_partitions(cast_and_arrow, meta=meta_hist)
# results.append(ddf_out)
computed_hist = ddf_out.compute()
computed_results.append(computed_hist)
del ddf_out
# Process other histogram types...
# Process other histogram types
if run_config["do_scatt_hist"]:
print("Computing scattering distribution...")
meta_hist = (
@@ -477,10 +358,8 @@ def process_histograms(
flow=None,
rho_eff=None,
BC_type=None,
# t=1,
meta=meta_hist,
).map_partitions(cast_and_arrow, meta=meta_hist)
# results.append(ddf_scatt)
computed_scatt = ddf_scatt.compute()
computed_results.append(computed_scatt)
@@ -520,15 +399,12 @@ def process_histograms(
flow=None,
rho_eff=None,
BC_type=None,
# t=1,
name_prefix=name_prefix,
meta=meta_hist,
)
#
tl_ddf = tl_ddf.map_partitions(cast_and_arrow, meta=meta_hist)
# results.append(tl_ddf)
computed_tl = tl_ddf.compute()
computed_results.append(computed_tl)
if computed_results:
@@ -542,7 +418,6 @@ def process_histograms(
if hasattr(df[col], "dtype") and "datetime" in str(df[col].dtype):
computed_results[i][col] = df[col].astype("datetime64[ns]")
merged_df = pd.concat(computed_results, axis=1)
# merged_ddf = dd.from_pandas(merged_df, npartitions=1)
# Double-check the merged result
if hasattr(merged_df.index, "dtype") and "datetime" in str(
merged_df.index.dtype
@@ -550,17 +425,6 @@ def process_histograms(
merged_df.index = merged_df.index.astype("datetime64[ns]")
merged_ddf = dd.from_pandas(merged_df, npartitions=1)
"""idx_target = "datetime64[ns]"
merged_ddf = merged_ddf.map_partitions(
lambda pdf: pdf.set_index(pdf.index.astype(idx_target, copy=False)),
meta=merged_ddf._meta,
)
index_as_dt = dd.to_datetime(merged_ddf.index.to_series())
merged_ddf["date"] = index_as_dt.map_partitions(
lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]")
)"""
merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize()
# --- Save hists to parquet
@@ -585,8 +449,5 @@ def process_histograms(
)
hist_future.compute()
# client.cancel([merged_ddf, *computed_results, hist_future])
# del merged_ddf, computed_results, hist_future
del merged_df, merged_ddf, computed_results
gc.collect()
# results = []

View File

@@ -209,19 +209,6 @@ def process_sp2xr_file(file_path, config_path, target_directory):
try:
df = read_sp2xr_csv(file_path, file_schema)
df = postprocess(df)
"""except zipfile.BadZipFile:
logger.warning(f"[{file_path}] Bad zip file: {e}")
df = pd.DataFrame()
except pd.errors.EmptyDataError:
logger.warning(f"[{file_path}] Empty CSV")
df = pd.DataFrame()
except ValueError as e:
logger.error(f"[{file_path}] ValueError: {e}")
if "Missing column provided to 'parse_dates'" in str(e):
print(f"Error for {file_path}: Missing 'Time Stamp'")
else:
print(f"ValueError for {file_path}: {e}")
df = pd.DataFrame()"""
except Exception as e:
logger.error(f"[{file_path}] Unexpected error: {e}")
df = pd.DataFrame()