refactor: split read_csv_files_with_dask into modular load, enrich, and save functions

This commit is contained in:
2025-07-25 20:28:24 +02:00
parent 399df18e81
commit 5690fcdcaf
4 changed files with 211 additions and 171 deletions

View File

@@ -10,7 +10,7 @@ _toolkit = import_module(".toolkit_legacy", package=__name__)
globals().update(_toolkit.__dict__) # reexport legacy names
# 2 Import new helpers you want public at package root
from .io import csv_to_parquet, read_csv_files_with_dask # noqa: F401,E402
from .io import csv_to_parquet, process_sp2xr_file # noqa: F401,E402
# Cleanup internal names
del import_module, _toolkit

30
src/sp2xr/helpers.py Normal file
View File

@@ -0,0 +1,30 @@
import os
import re
from pathlib import Path
from typing import Optional
def find_matching_hk_file(pbp_path: str) -> Optional[str]:
"""
From a PbP filename, return a path to the corresponding hk file (.csv or .zip), if found.
"""
base = re.sub(r"PbP", "hk", pbp_path)
base = re.sub(r"(_x)\d{4}", r"\g<1>0001", base)
base = re.sub(r"\.(csv|zip)$", "", base)
for ext in [".csv", ".zip"]:
candidate = f"{base}{ext}"
if os.path.exists(candidate):
return candidate
return None
def extract_base_filename(file_path: str) -> str:
"""
Extracts something like '20190409_x0001' from a filename.
"""
parts = Path(file_path).stem.split("_")
if len(parts) >= 2:
return f"{parts[-2]}_{parts[-1]}"
return Path(file_path).stem

View File

@@ -1,14 +1,18 @@
import pandas as pd
from pathlib import Path
import os
import re
import zipfile
import warnings
import yaml
import numpy as np
import dask.dataframe as dd
import logging
from .toolkit_legacy import calculate_delta_sec, extract_datetime
from .helpers import find_matching_hk_file
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> None:
@@ -31,7 +35,125 @@ def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> Non
df.to_parquet(parquet_path, index=False)
def read_csv_files_with_dask(file_path, config_path, target_directory):
def read_sp2xr_csv(file_path: str, schema: dict) -> dd.DataFrame:
"""
Read a SP2XR CSV/ZIP file with given schema from config.
Returns a Dask DataFrame.
"""
datetime_cols = [col for col, typ in schema.items() if typ == "datetime"]
dtype_cols = {col: typ for col, typ in schema.items() if typ != "datetime"}
df = dd.read_csv(
file_path,
dtype=dtype_cols,
parse_dates=datetime_cols,
blocksize=None,
assume_missing=True,
)
return df
def load_matching_hk_file(file_path):
expected_columns = ["Time Stamp", "Time (sec)"]
if not file_path:
return pd.DataFrame(columns=expected_columns)
hk_candidate = find_matching_hk_file(file_path)
if not hk_candidate:
print(f"!! No matching HK file found for: {file_path}")
return pd.DataFrame(columns=expected_columns)
try:
tmp_hk = pd.read_csv(
hk_candidate,
nrows=1,
parse_dates=["Time Stamp"],
usecols=expected_columns,
)
except (pd.errors.EmptyDataError, zipfile.BadZipFile) as e:
print(f"!! Error reading HK file for {file_path}: {e}")
tmp_hk = pd.DataFrame(columns=expected_columns)
except Exception as e:
print(f"!! Unexpected error with file {file_path}: {e}")
tmp_hk = pd.DataFrame(columns=expected_columns)
return tmp_hk
def enrich_sp2xr_dataframe(
df, file_path: str, first_val: float, t0: pd.Timestamp
) -> dd.DataFrame:
"""
Add time-derived and metadata columns to a raw SP2XR DataFrame.
"""
# Basic file metadata
df["path"] = str(file_path)
df["first_val"] = first_val
df["t0"] = t0
file_name_cut = (
file_path.split("\\")[-1].split("_")[-2]
+ "_"
+ file_path.split("\\")[-1].split("_")[-1].split(".")[-2]
)
df["file"] = file_name_cut
folder_name = file_path.split("\\")[-1].split("_")[-2]
df["folder_name"] = folder_name
# Ensure Time Stamp is in datetime format (if present)
if "Time Stamp" in df.columns:
df["Time Stamp"] = df["Time Stamp"].map_partitions(
pd.to_datetime, meta=("Time Stamp", "datetime64[ns]")
)
# Derived columns
df["delta_sec"] = df.map_partitions(
calculate_delta_sec, meta=("delta_sec", "float64")
)
df["calculated_time"] = df["t0"] + dd.to_timedelta(df["delta_sec"], unit="s")
df["file_datetime"] = df.apply(
extract_datetime, axis=1, meta=("file_datetime", "datetime64[ns]")
)
df["date_floored"] = df["calculated_time"].dt.floor("h")
df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]")
df["hour"] = df["calculated_time"].dt.hour.astype("i8")
df["floor_time"] = df["calculated_time"].dt.floor("s")
if "Time (sec)" in df.columns:
df["Secs_2GB"] = df["Time (sec)"].apply(np.floor, meta=("Secs_2GB", "i8"))
return df.set_index("calculated_time", drop=True, sort=False, sorted=True)
def save_sp2xr_parquet(df, file_path, target_directory):
fn = (
file_path.split("\\")[-1].split("_")[-2]
+ "_"
+ file_path.split("\\")[-1].split("_")[-1].split(".")[-2]
)
def name(part_idx):
return f"{fn}.parquet"
df.to_parquet(
path=target_directory,
engine="pyarrow",
partition_on=["date", "hour"],
coerce_timestamps="us",
allow_truncated_timestamps=True,
name_function=name,
write_index=True,
append=False,
)
def process_sp2xr_file(file_path, config_path, target_directory):
"""
This function reads Pbp or HK files from the SP2XR
@@ -50,178 +172,64 @@ def read_csv_files_with_dask(file_path, config_path, target_directory):
Content of the file as Dask DataFrame.
"""
if config_path:
with open(config_path) as f:
schema = yaml.safe_load(f)
if not os.path.exists(file_path):
raise FileNotFoundError(f"Input file does not exist: {file_path}")
pbp_schema = schema["pbp_schema"]
hk_schema = schema["hk_schema"]
else:
if not os.path.exists(config_path):
raise ValueError("No config file found.")
if file_path:
tmp_hk = pd.DataFrame()
with open(config_path) as f:
schema = yaml.safe_load(f)
hk_0001 = re.sub(r"PbP", "hk", file_path)
hk_0001 = re.sub(r"(_x)\d{4}", r"\g<1>0001", hk_0001)
hk_0001 = re.sub(r"\.(csv|zip)$", "", hk_0001)
if os.path.exists(f"{hk_0001}.csv"):
try:
tmp_hk = pd.read_csv(
f"{hk_0001}.csv",
nrows=1,
parse_dates=["Time Stamp"],
usecols=["Time Stamp", "Time (sec)"],
)
except pd.errors.EmptyDataError:
tmp_hk = pd.DataFrame()
except zipfile.BadZipFile:
print(f"!! Bad file: {file_path}")
tmp_hk = pd.DataFrame()
elif os.path.exists(f"{hk_0001}.zip"):
try:
tmp_hk = pd.read_csv(
f"{hk_0001}.zip",
nrows=1,
parse_dates=["Time Stamp"],
usecols=["Time Stamp", "Time (sec)"],
)
except pd.errors.EmptyDataError:
tmp_hk = pd.DataFrame()
tmp_hk = load_matching_hk_file(file_path)
if tmp_hk.empty:
warnings.warn("tmp_hk empty or not existing")
df = pd.DataFrame()
# pbp_schema = schema["pbp_schema"]
# hk_schema = schema["hk_schema"]
first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0]
if not tmp_hk.empty:
first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0]
if "PbP" in file_path:
"""
temp = pbp_schema
data_type = {
col: (
"float64"
if typ == "float"
else "int64" if typ == "int" else "string"
) # default fallback
for col, typ in pbp_schema.items()
if typ != "datetime"
}
parse_dates = [
col for col, typ in pbp_schema.items() if typ == "datetime"
]
"""
try:
df = dd.read_csv(
file_path,
dtype=pbp_schema, # data_type,
# parse_dates=parse_dates,
blocksize=None,
)
df = df.fillna(
0
) # is this because otherwise we cannot calculate the time_lag?
# df['time_lag'] = df['Incand Peak Time'] - df['Scatter Peak Time'] # 02.09.2024 this line implies that for particles with nan in the scatt transit time time_lag=incand transit time. better to calculate timelag for particles with both scatt and incand and set 0 for particles with only incand
#!!! MISSING CORRECT TIME LAG CALCULATIONS
except zipfile.BadZipFile:
print(f"!! Bad zip file: {file_path}")
df = pd.DataFrame()
return df
def postprocess_hk(df):
return df
elif "hk" in file_path:
datetime_cols = [
col for col, typ in hk_schema.items() if typ == "datetime"
]
dtype_cols = {
col: typ for col, typ in hk_schema.items() if typ != "datetime"
}
try:
df = dd.read_csv(
file_path,
dtype=dtype_cols, # filtered_dtype_dict,
parse_dates=datetime_cols,
blocksize=None,
assume_missing=True,
)
# df = dd.read_csv(file_path, dtype=data_type, parse_dates=['Time Stamp'], blocksize=None)#, assume_missing=True)
"""if 'Time Stamp' in df.columns:
datetime_format = '%m/%d/%Y %H:%M:%S.%f'
df['Time Stamp'] = df['Time Stamp'].map_partitions(pd.to_datetime, format=datetime_format, meta=('Time Stamp', 'datetime64[ns]'))
"""
except ValueError as e:
# Handle the error if the 'Time Stamp' column is missing or any other parsing error occurs
if "Missing column provided to 'parse_dates'" in str(e):
print(
f"Error for {file_path}: Missing column provided to 'parse_dates': 'Time Stamp'"
)
df = pd.DataFrame()
return df
except pd.errors.EmptyDataError:
df = pd.DataFrame()
return df
except zipfile.BadZipFile:
print(f"!! Bad zip file: {file_path}")
df = pd.DataFrame()
return df
def postprocess_pbp(df):
return df.fillna(0)
if len(df.columns) > 0:
df = df.loc[~df.isna().all(axis=1)]
if "PbP" in file_path:
file_schema = schema["pbp_schema"]
postprocess = postprocess_pbp
df["path"] = str(file_path)
df["first_val"] = first_val
df["t0"] = t0
file_name_cut = (
file_path.split("\\")[-1].split("_")[-2]
+ "_"
+ file_path.split("\\")[-1].split("_")[-1].split(".")[-2]
)
df["file"] = file_name_cut
folder_name = file_path.split("\\")[-1].split("_")[-2]
df["folder_name"] = folder_name
if "Time Stamp" in df.columns:
df["Time Stamp"] = df["Time Stamp"].map_partitions(
pd.to_datetime, meta=("Time Stamp", "datetime64[ns]")
)
df["delta_sec"] = df.map_partitions(
calculate_delta_sec, meta=("delta_sec", "float64")
)
df["calculated_time"] = df["t0"] + dd.to_timedelta(
df["delta_sec"], unit="s"
)
df["file_datetime"] = df.apply(
extract_datetime, axis=1, meta=("file_datetime", "datetime64[ns]")
)
df["date_floored"] = df["calculated_time"].dt.floor("h")
df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]")
df["hour"] = df["calculated_time"].dt.hour.astype("i8")
df["floor_time"] = df["calculated_time"].dt.floor("s")
df["Secs_2GB"] = df["Time (sec)"].apply(
np.floor, meta=("Secs_2GB", "i8")
)
fn = (
file_path.split("\\")[-1].split("_")[-2]
+ "_"
+ file_path.split("\\")[-1].split("_")[-1].split(".")[-2]
)
def name(part_idx):
return f"{fn}.parquet"
df = df.set_index("calculated_time", drop=True, sort=False, sorted=True)
df.to_parquet(
path=target_directory,
engine="pyarrow",
partition_on=["date", "hour"],
coerce_timestamps="us",
allow_truncated_timestamps=True,
name_function=name,
write_index=True,
append=False,
)
return df
else:
warnings.warn("tmp_hk empty or not existing")
return pd.DataFrame()
elif "hk" in file_path:
file_schema = schema["hk_schema"]
postprocess = postprocess_hk
else:
raise ValueError("No CSV files found.")
raise ValueError(f"Unrecognized file type: {file_path}")
try:
df = read_sp2xr_csv(file_path, file_schema)
df = postprocess(df)
"""except zipfile.BadZipFile:
logger.warning(f"[{file_path}] Bad zip file: {e}")
df = pd.DataFrame()
except pd.errors.EmptyDataError:
logger.warning(f"[{file_path}] Empty CSV")
df = pd.DataFrame()
except ValueError as e:
logger.error(f"[{file_path}] ValueError: {e}")
if "Missing column provided to 'parse_dates'" in str(e):
print(f"Error for {file_path}: Missing 'Time Stamp'")
else:
print(f"ValueError for {file_path}: {e}")
df = pd.DataFrame()"""
except Exception as e:
logger.error(f"[{file_path}] Unexpected error: {e}")
df = pd.DataFrame()
if len(df.columns) > 0:
df = df.loc[~df.isna().all(axis=1)]
df = enrich_sp2xr_dataframe(df, file_path, first_val, t0)
save_sp2xr_parquet(df, file_path, target_directory)
return df
else:
return pd.DataFrame()

View File

@@ -1,5 +1,7 @@
from pathlib import Path
from sp2xr.io import read_csv_files_with_dask
from sp2xr.io import process_sp2xr_file
# from sp2xr.io import read_csv_files_with_dask
DATA = Path(__file__).parent / "data"
@@ -8,7 +10,7 @@ def test_read_real_pbp_zip(tmp_path):
mini_zip = DATA / "mini_SP2XR_PbP_20190409110737_x0001.zip"
config_file = DATA / "config.yaml"
df = read_csv_files_with_dask(
df = process_sp2xr_file(
file_path=str(mini_zip),
config_path=str(config_file),
target_directory=str(tmp_path / "pq_out"),
@@ -24,7 +26,7 @@ def test_read_real_hk_zip(tmp_path):
mini_zip = DATA / "mini_SP2XR_hk_20190409110737_x0001.zip"
config_file = DATA / "config.yaml"
df = read_csv_files_with_dask(
df = process_sp2xr_file(
file_path=str(mini_zip),
config_path=str(config_file),
target_directory=str(tmp_path / "pq_out"),