From 4696c2cbb9ea669b7016a8fe992693cef8dc6514 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Fri, 22 Aug 2025 16:36:08 +0200 Subject: [PATCH] Cleanup: Remove wreck code from sp2xr_pipeline --- scripts/sp2xr_pipeline.py | 219 -------------------------------------- 1 file changed, 219 deletions(-) diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 7308bac..5dfd7d6 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -161,85 +161,6 @@ def main(): ) ddf_pbp_with_flow = enforce_schema(ddf_pbp_with_flow) - """ - dtypes = { - "calculated_time": "datetime64[ns]", - "Dropped Records": "float64", - "Record Count": "float64", - "Record Size": "float64", - "Flag": "float64", - "Particle Flags": "float64", - "Scatter relPeak": "float64", - "Scatter Transit Time": "float64", - "Scatter Peak Time": "float64", - "Scatter FWHM": "float64", - "Scatter Size (nm)": "float64", - 'Opt diam': "float64", - 'Opt diam scatt only': "float64", - 'Opt diam scatt only within range': "float64", - - "Incand relPeak": "float64", - "Incand Transit Time": "float64", - "Incand Peak Time": "float64", - "Incand FWHM": "float64", - "Incand Delay": "float64", - "Incand Mass (fg)": "float64", - "BC mass": "float64", - 'BC mass within range': "float64", - 'BC mass bin': "float64", - - "time_lag": "float64", - 'ratio_inc_scatt': "float64", - "cnts_thin": "float64", - "cnts_thin_noScatt": "float64", - "cnts_thick": "float64", - "cnts_thick_sat": "float64", - "cnts_thin_sat": "float64", - "cnts_ntl_sat": "float64", - "cnts_ntl": "float64", - "cnts_extreme_positive_timelag": "float64", - "cnts_thin_low_inc_scatt_ratio": "float64", - "cnts_thin_total": "float64", - "cnts_thick_total": "float64", - "cnts_unclassified": "float64", - "cnts_particles_for_tl_dist": "float64", - - "flag_valid_inc_signal": 'bool', - "flag_inc_not_sat": 'bool', - "flag_valid_inc_signal_in_range": 'bool', - "flag_valid_scatt_signal": 'bool', - "flag_scatt_not_sat": 'bool', - "flag_valid_scatt_signal_in_range": 'bool', - "flag_negative_timelag": 'bool', - "flag_extreme_positive_timelag": 'bool', - "flag_valid_timelag_thin": 'bool', - "flag_valid_timelag_thick": 'bool', - "flag_low_ratio_inc_scatt": 'bool', - - "Secs_2GB": "int64", - "Sample Flow Controller Read (vccm)": "float64", - - "Time (sec)": "float64", - "Packet Time Stamp": "float64", - "Particle Time Stamp": "float64", - 'first_val': "float64", - 'delta_sec': "float64", - "t0": "datetime64[ns]", - 'date_floored': "datetime64[ns]", - 'floor_time': "datetime64[us]", - "file_datetime": "datetime64[ns]", - "date": "datetime64[ns]", - "hour": "float64", - - "Reserved": "float64", - "path": "str", - "file": "str", - "folder_name": "str", - 'temporary_col': "int64", - } - - schema = pa.schema([(c, pa.from_numpy_dtype(t)) - for c, t in dtypes.items()])""" ddf_pbp_with_flow.to_parquet( path=f"{run_config['output']}/pbp_calibrated", partition_on=["date", "hour"], @@ -261,76 +182,6 @@ def main(): # 4. (optional) dt bulk conc -------------------------- if run_config["do_conc"]: - meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"]) - """meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"]) - - cast_map = {c: CANONICAL_DTYPES[c] for c in meta_conc.columns} - - meta_typed = ( - meta_conc # keep original empty DF (index intact) - .astype(cast_map, copy=False) - ) - - - - ddf_conc = ddf_pbp_hk_dt.map_partitions( - add_concentrations, dt=run_config["dt"], meta=meta_typed - ).astype(cast_map) - delete_partition_if_exists( - output_path=f"{run_config['output']}/conc_{run_config['dt']}s", - partition_values={"date": chunk_start.strftime("%Y-%m-%d"), "hour": chunk_start.hour} - ) - ddf_conc = enforce_schema(ddf_conc) - - #ddf_conc = ddf_conc[[col for col in ddf_conc.columns][:1]] - - ddf_conc.to_parquet( - f'{run_config["output"]}/conc_{run_config["dt"]}s', - partition_on=["date", "hour"], - engine="pyarrow", - write_index=True, - write_metadata_file=True, - append=True, - schema="infer", - )""" - - """def _cast_and_arrow(pdf): - pdf = pdf.astype(cast_map, copy=False) - # convert_dtypes guarantees identical pyarrow-backed dtypes - return pdf.convert_dtypes(dtype_backend="pyarrow")""" - - def _cast_and_arrow(pdf: pd.DataFrame) -> pd.DataFrame: - """ - Cast every column in *this* partition to the canonical dtype - (or DEFAULT_FLOAT), then switch the frame to pyarrow-backed dtypes. - Works for both concentration and histogram dataframes. - """ - cast_map = { - col: CANONICAL_DTYPES.get(col, DEFAULT_FLOAT) # fallback - for col in pdf.columns - } - pdf = pdf.astype(cast_map, copy=False) - return pdf.convert_dtypes(dtype_backend="pyarrow") - - """meta_typed = ( - meta_conc.astype(cast_map, copy=False) - .convert_dtypes(dtype_backend="pyarrow") - ) - - ddf_conc = ( - ddf_pbp_hk_dt - .map_partitions(add_concentrations, dt=run_config["dt"], meta=meta_typed) - .map_partitions(_cast_and_arrow, meta=meta_typed) # ← every partition - .persist() - ) - - idx_target = "datetime64[ns]" - ddf_conc = ddf_conc.map_partitions( - lambda pdf: pdf.set_index( - pdf.index.astype(idx_target, copy=False) - ), - meta=ddf_conc._meta - )""" meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"]) meta_conc = meta_conc.astype( {c: CANONICAL_DTYPES.get(c, DEFAULT_FLOAT) for c in meta_conc.columns}, @@ -360,74 +211,6 @@ def main(): append=True, schema="infer", ) - """meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"]) - ddf_conc = ddf_pbp_hk_dt.map_partitions( - add_concentrations, dt=run_config["dt"], #meta=meta_conc - ) - delete_partition_if_exists( - output_path=f"{run_config['output']}/conc_{run_config['dt']}s", - partition_values={"date": chunk_start.strftime("%Y-%m-%d"), "hour": chunk_start.hour} - ) - - dtypes = { - "calculated_time": "datetime64[ns]", - "Dropped Records": "float64", - "Incand Mass (fg)": "float64", - "BC mass": "float64", - "cnts_thin": "float64", - "cnts_thin_noScatt": "float64", - "cnts_thick": "float64", - "cnts_thick_sat": "float64", - "cnts_thin_sat": "float64", - "cnts_ntl_sat": "float64", - "cnts_ntl": "float64", - "cnts_extreme_positive_timelag": "float64", - "cnts_thin_low_inc_scatt_ratio": "float64", - "cnts_thin_total": "float64", - "cnts_thick_total": "float64", - "cnts_unclassified": "float64", - "BC numb from file": "float64", - "BC numb": "float64", - "BC numb within range": "float64", - "scatter numb from file": "float64", - "Scatt numb": "float64", - "Scatt numb within range": "float64", - "Secs_2GB_mean": "float64", - "Sample Flow Controller Read (sccm)": "float64", - "Sample Flow Controller Read (vccm)": "float64", - "BC_massConc_std": "float64", - "BC_massConc_vol": "float64", - "BC_numConc_std": "float64", - "BC_numConc_vol": "float64", - "BC_massConc_within_range_std": "float64", - "BC_massConc_within_range_vol": "float64", - "BC_numConc_within_range_std": "float64", - "BC_numConc_within_range_vol": "float64", - "S_numConc_std": "float64", - "S_numConc_vol": "float64", - "S_numConc_within_range_std": "float64", - "S_numConc_within_range_vol": "float64", - "date": "datetime64[ns]", - "hour": "float64", - } - - #meta = make_meta(dtypes) # empty DF that encodes the schema - schema = pa.schema([(c, pa.from_numpy_dtype(t)) - for c, t in dtypes.items()]) - - #ddf_conc = ddf_conc.set_index("calculated_time") - #ddf_conc = ddf_conc.astype(dtypes) - - ddf_conc = enforce_schema(ddf_conc) - ddf_conc.to_parquet( - f"{run_config['output']}/conc_{run_config['dt']}s", - partition_on=["date", "hour"], - engine="pyarrow", - write_index=True, - write_metadata_file=True, - append=True, - schema='infer', - )""" # 5. (optional) dt histograms -------------------------- @@ -470,8 +253,6 @@ def main(): .astype(DEFAULT_FLOAT, copy=False) .convert_dtypes(dtype_backend="pyarrow") ) - # cast_map = {c: DEFAULT_FLOAT[c] for c in meta_hist.columns} - # meta_typed = meta_hist.astype(DEFAULT_FLOAT, copy=False).convert_dtypes(dtype_backend="pyarrow") ddf_out = ddf_pbp_with_flow.map_partitions( process_hist_and_dist_partition, col="BC mass within range",