Cleanup: Remove wreck code from sp2xr_pipeline

This commit is contained in:
2025-08-22 16:36:08 +02:00
parent 681de6c203
commit 4696c2cbb9

View File

@@ -161,85 +161,6 @@ def main():
)
ddf_pbp_with_flow = enforce_schema(ddf_pbp_with_flow)
"""
dtypes = {
"calculated_time": "datetime64[ns]",
"Dropped Records": "float64",
"Record Count": "float64",
"Record Size": "float64",
"Flag": "float64",
"Particle Flags": "float64",
"Scatter relPeak": "float64",
"Scatter Transit Time": "float64",
"Scatter Peak Time": "float64",
"Scatter FWHM": "float64",
"Scatter Size (nm)": "float64",
'Opt diam': "float64",
'Opt diam scatt only': "float64",
'Opt diam scatt only within range': "float64",
"Incand relPeak": "float64",
"Incand Transit Time": "float64",
"Incand Peak Time": "float64",
"Incand FWHM": "float64",
"Incand Delay": "float64",
"Incand Mass (fg)": "float64",
"BC mass": "float64",
'BC mass within range': "float64",
'BC mass bin': "float64",
"time_lag": "float64",
'ratio_inc_scatt': "float64",
"cnts_thin": "float64",
"cnts_thin_noScatt": "float64",
"cnts_thick": "float64",
"cnts_thick_sat": "float64",
"cnts_thin_sat": "float64",
"cnts_ntl_sat": "float64",
"cnts_ntl": "float64",
"cnts_extreme_positive_timelag": "float64",
"cnts_thin_low_inc_scatt_ratio": "float64",
"cnts_thin_total": "float64",
"cnts_thick_total": "float64",
"cnts_unclassified": "float64",
"cnts_particles_for_tl_dist": "float64",
"flag_valid_inc_signal": 'bool',
"flag_inc_not_sat": 'bool',
"flag_valid_inc_signal_in_range": 'bool',
"flag_valid_scatt_signal": 'bool',
"flag_scatt_not_sat": 'bool',
"flag_valid_scatt_signal_in_range": 'bool',
"flag_negative_timelag": 'bool',
"flag_extreme_positive_timelag": 'bool',
"flag_valid_timelag_thin": 'bool',
"flag_valid_timelag_thick": 'bool',
"flag_low_ratio_inc_scatt": 'bool',
"Secs_2GB": "int64",
"Sample Flow Controller Read (vccm)": "float64",
"Time (sec)": "float64",
"Packet Time Stamp": "float64",
"Particle Time Stamp": "float64",
'first_val': "float64",
'delta_sec': "float64",
"t0": "datetime64[ns]",
'date_floored': "datetime64[ns]",
'floor_time': "datetime64[us]",
"file_datetime": "datetime64[ns]",
"date": "datetime64[ns]",
"hour": "float64",
"Reserved": "float64",
"path": "str",
"file": "str",
"folder_name": "str",
'temporary_col': "int64",
}
schema = pa.schema([(c, pa.from_numpy_dtype(t))
for c, t in dtypes.items()])"""
ddf_pbp_with_flow.to_parquet(
path=f"{run_config['output']}/pbp_calibrated",
partition_on=["date", "hour"],
@@ -261,76 +182,6 @@ def main():
# 4. (optional) dt bulk conc --------------------------
if run_config["do_conc"]:
meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"])
"""meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"])
cast_map = {c: CANONICAL_DTYPES[c] for c in meta_conc.columns}
meta_typed = (
meta_conc # keep original empty DF (index intact)
.astype(cast_map, copy=False)
)
ddf_conc = ddf_pbp_hk_dt.map_partitions(
add_concentrations, dt=run_config["dt"], meta=meta_typed
).astype(cast_map)
delete_partition_if_exists(
output_path=f"{run_config['output']}/conc_{run_config['dt']}s",
partition_values={"date": chunk_start.strftime("%Y-%m-%d"), "hour": chunk_start.hour}
)
ddf_conc = enforce_schema(ddf_conc)
#ddf_conc = ddf_conc[[col for col in ddf_conc.columns][:1]]
ddf_conc.to_parquet(
f'{run_config["output"]}/conc_{run_config["dt"]}s',
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
append=True,
schema="infer",
)"""
"""def _cast_and_arrow(pdf):
pdf = pdf.astype(cast_map, copy=False)
# convert_dtypes guarantees identical pyarrow-backed dtypes
return pdf.convert_dtypes(dtype_backend="pyarrow")"""
def _cast_and_arrow(pdf: pd.DataFrame) -> pd.DataFrame:
"""
Cast every column in *this* partition to the canonical dtype
(or DEFAULT_FLOAT), then switch the frame to pyarrow-backed dtypes.
Works for both concentration and histogram dataframes.
"""
cast_map = {
col: CANONICAL_DTYPES.get(col, DEFAULT_FLOAT) # fallback
for col in pdf.columns
}
pdf = pdf.astype(cast_map, copy=False)
return pdf.convert_dtypes(dtype_backend="pyarrow")
"""meta_typed = (
meta_conc.astype(cast_map, copy=False)
.convert_dtypes(dtype_backend="pyarrow")
)
ddf_conc = (
ddf_pbp_hk_dt
.map_partitions(add_concentrations, dt=run_config["dt"], meta=meta_typed)
.map_partitions(_cast_and_arrow, meta=meta_typed) # ← every partition
.persist()
)
idx_target = "datetime64[ns]"
ddf_conc = ddf_conc.map_partitions(
lambda pdf: pdf.set_index(
pdf.index.astype(idx_target, copy=False)
),
meta=ddf_conc._meta
)"""
meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"])
meta_conc = meta_conc.astype(
{c: CANONICAL_DTYPES.get(c, DEFAULT_FLOAT) for c in meta_conc.columns},
@@ -360,74 +211,6 @@ def main():
append=True,
schema="infer",
)
"""meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"])
ddf_conc = ddf_pbp_hk_dt.map_partitions(
add_concentrations, dt=run_config["dt"], #meta=meta_conc
)
delete_partition_if_exists(
output_path=f"{run_config['output']}/conc_{run_config['dt']}s",
partition_values={"date": chunk_start.strftime("%Y-%m-%d"), "hour": chunk_start.hour}
)
dtypes = {
"calculated_time": "datetime64[ns]",
"Dropped Records": "float64",
"Incand Mass (fg)": "float64",
"BC mass": "float64",
"cnts_thin": "float64",
"cnts_thin_noScatt": "float64",
"cnts_thick": "float64",
"cnts_thick_sat": "float64",
"cnts_thin_sat": "float64",
"cnts_ntl_sat": "float64",
"cnts_ntl": "float64",
"cnts_extreme_positive_timelag": "float64",
"cnts_thin_low_inc_scatt_ratio": "float64",
"cnts_thin_total": "float64",
"cnts_thick_total": "float64",
"cnts_unclassified": "float64",
"BC numb from file": "float64",
"BC numb": "float64",
"BC numb within range": "float64",
"scatter numb from file": "float64",
"Scatt numb": "float64",
"Scatt numb within range": "float64",
"Secs_2GB_mean": "float64",
"Sample Flow Controller Read (sccm)": "float64",
"Sample Flow Controller Read (vccm)": "float64",
"BC_massConc_std": "float64",
"BC_massConc_vol": "float64",
"BC_numConc_std": "float64",
"BC_numConc_vol": "float64",
"BC_massConc_within_range_std": "float64",
"BC_massConc_within_range_vol": "float64",
"BC_numConc_within_range_std": "float64",
"BC_numConc_within_range_vol": "float64",
"S_numConc_std": "float64",
"S_numConc_vol": "float64",
"S_numConc_within_range_std": "float64",
"S_numConc_within_range_vol": "float64",
"date": "datetime64[ns]",
"hour": "float64",
}
#meta = make_meta(dtypes) # empty DF that encodes the schema
schema = pa.schema([(c, pa.from_numpy_dtype(t))
for c, t in dtypes.items()])
#ddf_conc = ddf_conc.set_index("calculated_time")
#ddf_conc = ddf_conc.astype(dtypes)
ddf_conc = enforce_schema(ddf_conc)
ddf_conc.to_parquet(
f"{run_config['output']}/conc_{run_config['dt']}s",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
append=True,
schema='infer',
)"""
# 5. (optional) dt histograms --------------------------
@@ -470,8 +253,6 @@ def main():
.astype(DEFAULT_FLOAT, copy=False)
.convert_dtypes(dtype_backend="pyarrow")
)
# cast_map = {c: DEFAULT_FLOAT[c] for c in meta_hist.columns}
# meta_typed = meta_hist.astype(DEFAULT_FLOAT, copy=False).convert_dtypes(dtype_backend="pyarrow")
ddf_out = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="BC mass within range",