Fix: add control to skip empty ddf when time chunk is empty

This commit is contained in:
2025-08-22 16:06:05 +02:00
parent 8947046049
commit 43c31728e0
2 changed files with 56 additions and 12 deletions

View File

@@ -12,6 +12,7 @@ from sp2xr.helpers import (
extract_partitioned_datetimes,
get_time_chunks_from_range,
delete_partition_if_exists,
partition_rowcount,
)
from sp2xr.apply_calib import calibrate_single_particle
from sp2xr.resample_pbp_hk import (
@@ -80,12 +81,27 @@ def main():
pbp_filters.append(("hour", "<", chunk_end.hour))
# 2. HK processing --------------------------------------
ddf_hk = dd.read_parquet(
run_config["input_hk"],
engine="pyarrow",
filters=pbp_filters,
calculate_divisions=True,
)
try:
ddf_hk = dd.read_parquet(
run_config["input_hk"],
engine="pyarrow",
filters=pbp_filters,
calculate_divisions=True,
)
except (FileNotFoundError, OSError):
print(" → no HK files for this chunk; skipping.")
continue
if ddf_hk.npartitions == 0 or partition_rowcount(ddf_hk) == 0:
print(" → HK frame is empty; skipping.")
continue
ddf_hk = ddf_hk.map_partitions(lambda pdf: pdf.sort_index())
if not ddf_hk.known_divisions:
ddf_hk = (
ddf_hk.reset_index().set_index( # 'calculated_time' becomes a column
"calculated_time", sorted=False, shuffle="tasks"
) # Dask now infers divisions
)
ddf_hk = ddf_hk.repartition(freq="1h")
meta = pd.DataFrame(
{
@@ -103,12 +119,28 @@ def main():
flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
# 3. PBP processing --------------------------------------
ddf_raw = dd.read_parquet(
run_config["input_pbp"],
engine="pyarrow",
filters=pbp_filters,
calculate_divisions=True,
)
try:
ddf_raw = dd.read_parquet(
run_config["input_pbp"],
engine="pyarrow",
filters=pbp_filters,
calculate_divisions=True,
)
except (FileNotFoundError, OSError):
print(" → no PbP files for this chunk; skipping.")
continue
if ddf_raw.npartitions == 0 or partition_rowcount(ddf_raw) == 0:
print(" → PbP frame is empty; skipping.")
continue
ddf_raw = ddf_raw.map_partitions(lambda pdf: pdf.sort_index())
if not ddf_raw.known_divisions:
ddf_raw = (
ddf_raw.reset_index().set_index( # 'calculated_time' becomes a column
"calculated_time", sorted=False, shuffle="tasks"
) # Dask now infers divisions
)
ddf_raw = ddf_raw.repartition(freq="1h")
ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config)

View File

@@ -4,6 +4,7 @@ import re
import yaml
import argparse
import pandas as pd
import dask.dataframe as dd
from pathlib import Path
from dask_jobqueue import SLURMCluster
import multiprocessing
@@ -427,6 +428,17 @@ def export_xr_ini_to_yaml(ini_path, yaml_path):
)
def partition_rowcount(ddf: dd.DataFrame) -> int:
"""
Return total number of rows in a Dask DataFrame.
"""
row_series = ddf.map_partitions(
lambda pdf: pd.Series([len(pdf)]),
meta=pd.Series(dtype="int64"),
)
return int(row_series.sum().compute())
'''def normalize_dtypes(
df: Union[pd.DataFrame, dd.DataFrame],
) -> Union[pd.DataFrame, dd.DataFrame]: