From 43c31728e04098d0851ad27f0d8a3a73b41f7a70 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Fri, 22 Aug 2025 16:06:05 +0200 Subject: [PATCH] Fix: add control to skip empty ddf when time chunk is empty --- scripts/sp2xr_pipeline.py | 56 ++++++++++++++++++++++++++++++--------- src/sp2xr/helpers.py | 12 +++++++++ 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 47121e9..9bec6ce 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -12,6 +12,7 @@ from sp2xr.helpers import ( extract_partitioned_datetimes, get_time_chunks_from_range, delete_partition_if_exists, + partition_rowcount, ) from sp2xr.apply_calib import calibrate_single_particle from sp2xr.resample_pbp_hk import ( @@ -80,12 +81,27 @@ def main(): pbp_filters.append(("hour", "<", chunk_end.hour)) # 2. HK processing -------------------------------------- - ddf_hk = dd.read_parquet( - run_config["input_hk"], - engine="pyarrow", - filters=pbp_filters, - calculate_divisions=True, - ) + try: + ddf_hk = dd.read_parquet( + run_config["input_hk"], + engine="pyarrow", + filters=pbp_filters, + calculate_divisions=True, + ) + except (FileNotFoundError, OSError): + print(" → no HK files for this chunk; skipping.") + continue + + if ddf_hk.npartitions == 0 or partition_rowcount(ddf_hk) == 0: + print(" → HK frame is empty; skipping.") + continue + ddf_hk = ddf_hk.map_partitions(lambda pdf: pdf.sort_index()) + if not ddf_hk.known_divisions: + ddf_hk = ( + ddf_hk.reset_index().set_index( # 'calculated_time' becomes a column + "calculated_time", sorted=False, shuffle="tasks" + ) # Dask now infers divisions + ) ddf_hk = ddf_hk.repartition(freq="1h") meta = pd.DataFrame( { @@ -103,12 +119,28 @@ def main(): flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() # 3. PBP processing -------------------------------------- - ddf_raw = dd.read_parquet( - run_config["input_pbp"], - engine="pyarrow", - filters=pbp_filters, - calculate_divisions=True, - ) + try: + ddf_raw = dd.read_parquet( + run_config["input_pbp"], + engine="pyarrow", + filters=pbp_filters, + calculate_divisions=True, + ) + except (FileNotFoundError, OSError): + print(" → no PbP files for this chunk; skipping.") + continue + + if ddf_raw.npartitions == 0 or partition_rowcount(ddf_raw) == 0: + print(" → PbP frame is empty; skipping.") + continue + + ddf_raw = ddf_raw.map_partitions(lambda pdf: pdf.sort_index()) + if not ddf_raw.known_divisions: + ddf_raw = ( + ddf_raw.reset_index().set_index( # 'calculated_time' becomes a column + "calculated_time", sorted=False, shuffle="tasks" + ) # Dask now infers divisions + ) ddf_raw = ddf_raw.repartition(freq="1h") ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config) diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index 0d0cad6..9e9fb16 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -4,6 +4,7 @@ import re import yaml import argparse import pandas as pd +import dask.dataframe as dd from pathlib import Path from dask_jobqueue import SLURMCluster import multiprocessing @@ -427,6 +428,17 @@ def export_xr_ini_to_yaml(ini_path, yaml_path): ) +def partition_rowcount(ddf: dd.DataFrame) -> int: + """ + Return total number of rows in a Dask DataFrame. + """ + row_series = ddf.map_partitions( + lambda pdf: pd.Series([len(pdf)]), + meta=pd.Series(dtype="int64"), + ) + return int(row_series.sum().compute()) + + '''def normalize_dtypes( df: Union[pd.DataFrame, dd.DataFrame], ) -> Union[pd.DataFrame, dd.DataFrame]: