From b377c36c28e33dea0ffa8a3b0ac6171a9e263025 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Tue, 9 Sep 2025 15:03:44 +0200 Subject: [PATCH] Chore: cleanup old code --- scripts/sp2xr_pipeline.py | 174 -------------------------------------- src/sp2xr/distribution.py | 141 +----------------------------- src/sp2xr/io.py | 13 --- 3 files changed, 1 insertion(+), 327 deletions(-) diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index b03f66a..1bf1c9f 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -304,180 +304,6 @@ def main(): client, ) - """if run_config["do_BC_hist"]: - print("Computing BC distributions...") - # --- Mass histogram - BC_hist_configs = [ - {"flag_col": None, "flag_value": None}, - {"flag_col": "cnts_thin", "flag_value": 1}, - {"flag_col": "cnts_thin_noScatt", "flag_value": 1}, - {"flag_col": "cnts_thick", "flag_value": 1}, - {"flag_col": "cnts_thick_sat", "flag_value": 1}, - {"flag_col": "cnts_thin_sat", "flag_value": 1}, - {"flag_col": "cnts_ntl_sat", "flag_value": 1}, - {"flag_col": "cnts_ntl", "flag_value": 1}, - { - "flag_col": "cnts_extreme_positive_timelag", - "flag_value": 1, - }, - { - "flag_col": "cnts_thin_low_inc_scatt_ratio", - "flag_value": 1, - }, - {"flag_col": "cnts_thin_total", "flag_value": 1}, - {"flag_col": "cnts_thick_total", "flag_value": 1}, - {"flag_col": "cnts_unclassified", "flag_value": 1}, - ] - - results = [] - - for cfg_hist in BC_hist_configs[:2]: - meta_hist = ( - make_hist_meta( - bin_ctrs=inc_mass_bin_ctrs, - kind="mass", - flag_col=cfg_hist["flag_col"], - rho_eff=run_config["rho_eff"], - BC_type=run_config["BC_type"], - ) - .astype(DEFAULT_FLOAT, copy=False) - .convert_dtypes(dtype_backend="pyarrow") - ) - ddf_out = ddf_pbp_with_flow.map_partitions( - process_hist_and_dist_partition, - col="BC mass within range", - flag_col=cfg_hist["flag_col"], - flag_value=cfg_hist["flag_value"], - bin_lims=inc_mass_bin_lims, - bin_ctrs=inc_mass_bin_ctrs, - dt=run_config["dt"], - calculate_conc=True, - flow=None, - rho_eff=run_config["rho_eff"], - BC_type=run_config["BC_type"], - #t=1, - meta=meta_hist, - ).map_partitions(cast_and_arrow, meta=meta_hist) - results.append(ddf_out) - - # --- Scattering histogram - if run_config["do_scatt_hist"]: - print("Computing scattering distribution...") - meta_hist = ( - make_hist_meta( - bin_ctrs=scatt_bin_ctrs, - kind="scatt", - flag_col=None, - rho_eff=None, - BC_type=None, - ) - .astype(DEFAULT_FLOAT, copy=False) - .convert_dtypes(dtype_backend="pyarrow") - ) - ddf_scatt = ddf_pbp_with_flow.map_partitions( - process_hist_and_dist_partition, - col="Opt diam scatt only", - flag_col=None, - flag_value=None, - bin_lims=scatt_bin_lims, - bin_ctrs=scatt_bin_ctrs, - dt=run_config["dt"], - calculate_conc=True, - flow=None, - rho_eff=None, - BC_type=None, - #t=1, - meta=meta_hist, - ).map_partitions(cast_and_arrow, meta=meta_hist) - results.append(ddf_scatt) - - # --- Timelag histogram - if run_config["do_timelag_hist"]: - print("Computing time delay distribution...") - mass_bins = ( - ddf_pbp_with_flow[["BC mass bin"]] - .compute() - .astype("Int64") - .drop_duplicates() - .dropna() - ) - - for idx, mass_bin in enumerate(mass_bins[:1]): - ddf_bin = ddf_pbp_with_flow[ - ddf_pbp_with_flow["BC mass bin"] == mass_bin - ] - - name_prefix = f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag" - - meta_hist = make_hist_meta( - bin_ctrs=timelag_bin_ctrs, - kind="timelag", - flag_col="cnts_particles_for_tl_dist", - name_prefix=name_prefix, - rho_eff=None, - BC_type=None, - ) - - tl_ddf = ddf_bin.map_partitions( - process_hist_and_dist_partition, - col="time_lag", - flag_col="cnts_particles_for_tl_dist", - flag_value=1, - bin_lims=timelag_bins_lims, - bin_ctrs=timelag_bin_ctrs, - dt=run_config["dt"], - calculate_conc=True, - flow=None, - rho_eff=None, - BC_type=None, - #t=1, - name_prefix=name_prefix, - meta=meta_hist, - ) - - # - tl_ddf = tl_ddf.map_partitions(cast_and_arrow, meta=meta_hist) - - results.append(tl_ddf) - # --- Merge all hists - - merged_ddf = dd.concat(results, axis=1, interleave_partitions=True) - - idx_target = "datetime64[ns]" - merged_ddf = merged_ddf.map_partitions( - lambda pdf: pdf.set_index(pdf.index.astype(idx_target, copy=False)), - meta=merged_ddf._meta, - ) - - index_as_dt = dd.to_datetime(merged_ddf.index.to_series()) - merged_ddf["date"] = index_as_dt.map_partitions( - lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]") - ) - - # --- Save hists to parquet - - delete_partition_if_exists( - output_path=f"{run_config['output']}/hists_{run_config['dt']}s", - partition_values={ - "date": chunk_start.strftime("%Y-%m-%d"), - "hour": chunk_start.hour, - }, - ) - merged_ddf.to_parquet( - f"{run_config['output']}/hists_{run_config['dt']}s", - partition_on=["date"], - engine="pyarrow", - write_index=True, - write_metadata_file=True, - append=True, - schema="infer", - ) - - client.cancel([ddf_pbp_with_flow, ddf_hk, - ddf_hk_dt, ddf_pbp_dt, ddf_pbp_hk_dt]) - del ddf_pbp_with_flow - client.run(gc.collect) # workers - gc.collect() # client""" finally: # Comprehensive cleanup try: diff --git a/src/sp2xr/distribution.py b/src/sp2xr/distribution.py index 800ef3f..6e372e0 100644 --- a/src/sp2xr/distribution.py +++ b/src/sp2xr/distribution.py @@ -94,118 +94,6 @@ def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400 return counts -"""def process_hist_and_dist_partition( - df, - col, - flag_col, - flag_value, - bin_lims, - bin_ctrs, - dt, - calculate_conc=True, - flow=None, - rho_eff=1800, - BC_type="", - t=1, -): - dt_str = f"{dt}s" - # Filter the dataframe per partition - if flag_col and flag_value is not None: - df_filtered = df.loc[df[flag_col] == flag_value] - else: - df_filtered = df - - # Adjust flag_col - if flag_col is not None and "cnts" in flag_col: - flag_col = flag_col[5:] - - df_resampled = ( - df_filtered[[col]] - .resample(dt_str) - .apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims)) - ) - flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean() - - # df_resampled = df_resampled.to_frame(name="result") - if isinstance(df_resampled, pd.Series): - df_resampled = df_resampled.to_frame(name="result") - else: # already a DataFrame - df_resampled.columns = ["result"] - - original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count() - original_idx_df.columns = ["original_idx"] # ensure proper naming - - df_resampled = df_resampled.join(original_idx_df, how="left") - if "original_idx" in df_resampled.columns: - df_resampled = df_resampled[df_resampled["original_idx"] != 0] - df_resampled = df_resampled.drop("original_idx", axis=1) - - # Initialize DataFrame based on whether the result is empty or not - list_col = "result" - max_list_length = len(bin_ctrs) - - if df_resampled.empty: - columns = [f"{list_col}_{i}" for i in range(max_list_length)] - ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index) - else: - ddf_hist = df_resampled[list_col].apply(pd.Series) - ddf_hist.index = df_resampled.index - columns = [f"{list_col}_{i}" for i in range(max_list_length)] - ddf_hist.columns = columns - - if calculate_conc: - # Join with the sample flow controller data - inc_hist_flow = ddf_hist.join(flow_dt) - - if rho_eff is not None and BC_type is not None: - # Calculate diameters and densities - density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) - - # Calculate number concentration - dNdlogDmev = counts2numConc( - inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t - ) / get_dlogp(Dmev) - if flag_col is None: - dNdlogDmev.columns = [f"dNdlogDmev_all_{i:.2f}" for i in Dmev] - else: - dNdlogDmev.columns = [f"dNdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] - - # Calculate mass concentration - dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density) - if flag_col is None: - dMdlogDmev.columns = [f"dMdlogDmev_all_{i:.2f}" for i in Dmev] - else: - dMdlogDmev.columns = [f"dMdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] - - else: - - # Calculate number concentration - dNdlogDmev = counts2numConc( - inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t - ) / get_dlogp(bin_ctrs) - if flag_col is None: - dNdlogDmev.columns = [f"dNdlogDsc_all_{i:.2f}" for i in bin_ctrs] - else: - dNdlogDmev.columns = [f"dNdlogDsc_{flag_col}_{i:.2f}" for i in bin_ctrs] - - dMdlogDmev = None - # return dNdlogDmev, dMdlogDmev - - # else: - # return ddf_hist, None - - if calculate_conc: - # Concatenate results into a single dataframe - if dMdlogDmev is not None: - result_df = pd.concat([dNdlogDmev, dMdlogDmev], axis=1) - else: - result_df = dNdlogDmev - else: - result_df = ddf_hist - return result_df -""" - - def process_hist_and_dist_partition( df: pd.DataFrame, col: str, @@ -264,11 +152,6 @@ def process_hist_and_dist_partition( # expand list-of-counts -> wide matrix (one col per bin) max_list_length = len(bin_ctrs) if df_resampled.empty: - """# IMPORTANT: use the resampled time index so schema matches later joins - idx = flow_dt.index - columns = [f"result_{i}" for i in range(max_list_length)] - ddf_hist = pd.DataFrame(np.nan, columns=columns, index=idx)""" - ddf_hist = pd.DataFrame( columns=[f"result_{i}" for i in range(max_list_length)], index=pd.DatetimeIndex([], name=df.index.name), @@ -391,7 +274,6 @@ def process_histograms( ): """Separate function to process histograms and avoid graph buildup""" - # results = [] computed_results = [] if run_config["do_BC_hist"]: @@ -445,13 +327,12 @@ def process_histograms( BC_type=run_config["BC_type"], meta=meta_hist, ).map_partitions(cast_and_arrow, meta=meta_hist) - # results.append(ddf_out) computed_hist = ddf_out.compute() computed_results.append(computed_hist) del ddf_out - # Process other histogram types... + # Process other histogram types if run_config["do_scatt_hist"]: print("Computing scattering distribution...") meta_hist = ( @@ -477,10 +358,8 @@ def process_histograms( flow=None, rho_eff=None, BC_type=None, - # t=1, meta=meta_hist, ).map_partitions(cast_and_arrow, meta=meta_hist) - # results.append(ddf_scatt) computed_scatt = ddf_scatt.compute() computed_results.append(computed_scatt) @@ -520,15 +399,12 @@ def process_histograms( flow=None, rho_eff=None, BC_type=None, - # t=1, name_prefix=name_prefix, meta=meta_hist, ) - # tl_ddf = tl_ddf.map_partitions(cast_and_arrow, meta=meta_hist) - # results.append(tl_ddf) computed_tl = tl_ddf.compute() computed_results.append(computed_tl) if computed_results: @@ -542,7 +418,6 @@ def process_histograms( if hasattr(df[col], "dtype") and "datetime" in str(df[col].dtype): computed_results[i][col] = df[col].astype("datetime64[ns]") merged_df = pd.concat(computed_results, axis=1) - # merged_ddf = dd.from_pandas(merged_df, npartitions=1) # Double-check the merged result if hasattr(merged_df.index, "dtype") and "datetime" in str( merged_df.index.dtype @@ -550,17 +425,6 @@ def process_histograms( merged_df.index = merged_df.index.astype("datetime64[ns]") merged_ddf = dd.from_pandas(merged_df, npartitions=1) - - """idx_target = "datetime64[ns]" - merged_ddf = merged_ddf.map_partitions( - lambda pdf: pdf.set_index(pdf.index.astype(idx_target, copy=False)), - meta=merged_ddf._meta, - ) - - index_as_dt = dd.to_datetime(merged_ddf.index.to_series()) - merged_ddf["date"] = index_as_dt.map_partitions( - lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]") - )""" merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize() # --- Save hists to parquet @@ -585,8 +449,5 @@ def process_histograms( ) hist_future.compute() - # client.cancel([merged_ddf, *computed_results, hist_future]) - # del merged_ddf, computed_results, hist_future del merged_df, merged_ddf, computed_results gc.collect() - # results = [] diff --git a/src/sp2xr/io.py b/src/sp2xr/io.py index 3615deb..9e6939f 100644 --- a/src/sp2xr/io.py +++ b/src/sp2xr/io.py @@ -209,19 +209,6 @@ def process_sp2xr_file(file_path, config_path, target_directory): try: df = read_sp2xr_csv(file_path, file_schema) df = postprocess(df) - """except zipfile.BadZipFile: - logger.warning(f"[{file_path}] Bad zip file: {e}") - df = pd.DataFrame() - except pd.errors.EmptyDataError: - logger.warning(f"[{file_path}] Empty CSV") - df = pd.DataFrame() - except ValueError as e: - logger.error(f"[{file_path}] ValueError: {e}") - if "Missing column provided to 'parse_dates'" in str(e): - print(f"Error for {file_path}: Missing 'Time Stamp'") - else: - print(f"ValueError for {file_path}: {e}") - df = pd.DataFrame()""" except Exception as e: logger.error(f"[{file_path}] Unexpected error: {e}") df = pd.DataFrame()