diff --git a/src/sp2xr/__init__.py b/src/sp2xr/__init__.py index 5ce6696..6083ffd 100644 --- a/src/sp2xr/__init__.py +++ b/src/sp2xr/__init__.py @@ -10,7 +10,7 @@ _toolkit = import_module(".toolkit_legacy", package=__name__) globals().update(_toolkit.__dict__) # re‑export legacy names # 2 Import new helpers you want public at package root -from .io import csv_to_parquet, read_csv_files_with_dask # noqa: F401,E402 +from .io import csv_to_parquet, process_sp2xr_file # noqa: F401,E402 # Cleanup internal names del import_module, _toolkit diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py new file mode 100644 index 0000000..52867ad --- /dev/null +++ b/src/sp2xr/helpers.py @@ -0,0 +1,30 @@ +import os +import re +from pathlib import Path +from typing import Optional + + +def find_matching_hk_file(pbp_path: str) -> Optional[str]: + """ + From a PbP filename, return a path to the corresponding hk file (.csv or .zip), if found. + """ + base = re.sub(r"PbP", "hk", pbp_path) + base = re.sub(r"(_x)\d{4}", r"\g<1>0001", base) + base = re.sub(r"\.(csv|zip)$", "", base) + + for ext in [".csv", ".zip"]: + candidate = f"{base}{ext}" + if os.path.exists(candidate): + return candidate + + return None + + +def extract_base_filename(file_path: str) -> str: + """ + Extracts something like '20190409_x0001' from a filename. + """ + parts = Path(file_path).stem.split("_") + if len(parts) >= 2: + return f"{parts[-2]}_{parts[-1]}" + return Path(file_path).stem diff --git a/src/sp2xr/io.py b/src/sp2xr/io.py index bb8e100..3615deb 100644 --- a/src/sp2xr/io.py +++ b/src/sp2xr/io.py @@ -1,14 +1,18 @@ import pandas as pd from pathlib import Path import os -import re import zipfile import warnings import yaml import numpy as np import dask.dataframe as dd +import logging from .toolkit_legacy import calculate_delta_sec, extract_datetime +from .helpers import find_matching_hk_file + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> None: @@ -31,7 +35,125 @@ def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> Non df.to_parquet(parquet_path, index=False) -def read_csv_files_with_dask(file_path, config_path, target_directory): +def read_sp2xr_csv(file_path: str, schema: dict) -> dd.DataFrame: + """ + Read a SP2XR CSV/ZIP file with given schema from config. + + Returns a Dask DataFrame. + """ + datetime_cols = [col for col, typ in schema.items() if typ == "datetime"] + dtype_cols = {col: typ for col, typ in schema.items() if typ != "datetime"} + + df = dd.read_csv( + file_path, + dtype=dtype_cols, + parse_dates=datetime_cols, + blocksize=None, + assume_missing=True, + ) + return df + + +def load_matching_hk_file(file_path): + expected_columns = ["Time Stamp", "Time (sec)"] + + if not file_path: + return pd.DataFrame(columns=expected_columns) + + hk_candidate = find_matching_hk_file(file_path) + + if not hk_candidate: + print(f"!! No matching HK file found for: {file_path}") + return pd.DataFrame(columns=expected_columns) + + try: + tmp_hk = pd.read_csv( + hk_candidate, + nrows=1, + parse_dates=["Time Stamp"], + usecols=expected_columns, + ) + except (pd.errors.EmptyDataError, zipfile.BadZipFile) as e: + print(f"!! Error reading HK file for {file_path}: {e}") + tmp_hk = pd.DataFrame(columns=expected_columns) + except Exception as e: + print(f"!! Unexpected error with file {file_path}: {e}") + tmp_hk = pd.DataFrame(columns=expected_columns) + + return tmp_hk + + +def enrich_sp2xr_dataframe( + df, file_path: str, first_val: float, t0: pd.Timestamp +) -> dd.DataFrame: + """ + Add time-derived and metadata columns to a raw SP2XR DataFrame. + """ + + # Basic file metadata + df["path"] = str(file_path) + df["first_val"] = first_val + df["t0"] = t0 + + file_name_cut = ( + file_path.split("\\")[-1].split("_")[-2] + + "_" + + file_path.split("\\")[-1].split("_")[-1].split(".")[-2] + ) + df["file"] = file_name_cut + folder_name = file_path.split("\\")[-1].split("_")[-2] + df["folder_name"] = folder_name + + # Ensure Time Stamp is in datetime format (if present) + if "Time Stamp" in df.columns: + df["Time Stamp"] = df["Time Stamp"].map_partitions( + pd.to_datetime, meta=("Time Stamp", "datetime64[ns]") + ) + + # Derived columns + df["delta_sec"] = df.map_partitions( + calculate_delta_sec, meta=("delta_sec", "float64") + ) + df["calculated_time"] = df["t0"] + dd.to_timedelta(df["delta_sec"], unit="s") + + df["file_datetime"] = df.apply( + extract_datetime, axis=1, meta=("file_datetime", "datetime64[ns]") + ) + + df["date_floored"] = df["calculated_time"].dt.floor("h") + df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]") + df["hour"] = df["calculated_time"].dt.hour.astype("i8") + df["floor_time"] = df["calculated_time"].dt.floor("s") + + if "Time (sec)" in df.columns: + df["Secs_2GB"] = df["Time (sec)"].apply(np.floor, meta=("Secs_2GB", "i8")) + + return df.set_index("calculated_time", drop=True, sort=False, sorted=True) + + +def save_sp2xr_parquet(df, file_path, target_directory): + fn = ( + file_path.split("\\")[-1].split("_")[-2] + + "_" + + file_path.split("\\")[-1].split("_")[-1].split(".")[-2] + ) + + def name(part_idx): + return f"{fn}.parquet" + + df.to_parquet( + path=target_directory, + engine="pyarrow", + partition_on=["date", "hour"], + coerce_timestamps="us", + allow_truncated_timestamps=True, + name_function=name, + write_index=True, + append=False, + ) + + +def process_sp2xr_file(file_path, config_path, target_directory): """ This function reads Pbp or HK files from the SP2XR @@ -50,178 +172,64 @@ def read_csv_files_with_dask(file_path, config_path, target_directory): Content of the file as Dask DataFrame. """ - if config_path: - with open(config_path) as f: - schema = yaml.safe_load(f) + if not os.path.exists(file_path): + raise FileNotFoundError(f"Input file does not exist: {file_path}") - pbp_schema = schema["pbp_schema"] - hk_schema = schema["hk_schema"] - else: + if not os.path.exists(config_path): raise ValueError("No config file found.") - if file_path: - tmp_hk = pd.DataFrame() + with open(config_path) as f: + schema = yaml.safe_load(f) - hk_0001 = re.sub(r"PbP", "hk", file_path) - hk_0001 = re.sub(r"(_x)\d{4}", r"\g<1>0001", hk_0001) - hk_0001 = re.sub(r"\.(csv|zip)$", "", hk_0001) - if os.path.exists(f"{hk_0001}.csv"): - try: - tmp_hk = pd.read_csv( - f"{hk_0001}.csv", - nrows=1, - parse_dates=["Time Stamp"], - usecols=["Time Stamp", "Time (sec)"], - ) - except pd.errors.EmptyDataError: - tmp_hk = pd.DataFrame() - except zipfile.BadZipFile: - print(f"!! Bad file: {file_path}") - tmp_hk = pd.DataFrame() - elif os.path.exists(f"{hk_0001}.zip"): - try: - tmp_hk = pd.read_csv( - f"{hk_0001}.zip", - nrows=1, - parse_dates=["Time Stamp"], - usecols=["Time Stamp", "Time (sec)"], - ) - except pd.errors.EmptyDataError: - tmp_hk = pd.DataFrame() + tmp_hk = load_matching_hk_file(file_path) + if tmp_hk.empty: + warnings.warn("tmp_hk empty or not existing") + df = pd.DataFrame() + # pbp_schema = schema["pbp_schema"] + # hk_schema = schema["hk_schema"] + first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0] - if not tmp_hk.empty: - first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0] - if "PbP" in file_path: - """ - temp = pbp_schema - data_type = { - col: ( - "float64" - if typ == "float" - else "int64" if typ == "int" else "string" - ) # default fallback - for col, typ in pbp_schema.items() - if typ != "datetime" - } - parse_dates = [ - col for col, typ in pbp_schema.items() if typ == "datetime" - ] - """ - try: - df = dd.read_csv( - file_path, - dtype=pbp_schema, # data_type, - # parse_dates=parse_dates, - blocksize=None, - ) - df = df.fillna( - 0 - ) # is this because otherwise we cannot calculate the time_lag? - # df['time_lag'] = df['Incand Peak Time'] - df['Scatter Peak Time'] # 02.09.2024 this line implies that for particles with nan in the scatt transit time time_lag=incand transit time. better to calculate timelag for particles with both scatt and incand and set 0 for particles with only incand - #!!! MISSING CORRECT TIME LAG CALCULATIONS - except zipfile.BadZipFile: - print(f"!! Bad zip file: {file_path}") - df = pd.DataFrame() - return df + def postprocess_hk(df): + return df - elif "hk" in file_path: - datetime_cols = [ - col for col, typ in hk_schema.items() if typ == "datetime" - ] - dtype_cols = { - col: typ for col, typ in hk_schema.items() if typ != "datetime" - } - try: - df = dd.read_csv( - file_path, - dtype=dtype_cols, # filtered_dtype_dict, - parse_dates=datetime_cols, - blocksize=None, - assume_missing=True, - ) - # df = dd.read_csv(file_path, dtype=data_type, parse_dates=['Time Stamp'], blocksize=None)#, assume_missing=True) - """if 'Time Stamp' in df.columns: - datetime_format = '%m/%d/%Y %H:%M:%S.%f' - df['Time Stamp'] = df['Time Stamp'].map_partitions(pd.to_datetime, format=datetime_format, meta=('Time Stamp', 'datetime64[ns]')) - """ - except ValueError as e: - # Handle the error if the 'Time Stamp' column is missing or any other parsing error occurs - if "Missing column provided to 'parse_dates'" in str(e): - print( - f"Error for {file_path}: Missing column provided to 'parse_dates': 'Time Stamp'" - ) - df = pd.DataFrame() - return df - except pd.errors.EmptyDataError: - df = pd.DataFrame() - return df - except zipfile.BadZipFile: - print(f"!! Bad zip file: {file_path}") - df = pd.DataFrame() - return df + def postprocess_pbp(df): + return df.fillna(0) - if len(df.columns) > 0: - df = df.loc[~df.isna().all(axis=1)] + if "PbP" in file_path: + file_schema = schema["pbp_schema"] + postprocess = postprocess_pbp - df["path"] = str(file_path) - df["first_val"] = first_val - df["t0"] = t0 - file_name_cut = ( - file_path.split("\\")[-1].split("_")[-2] - + "_" - + file_path.split("\\")[-1].split("_")[-1].split(".")[-2] - ) - df["file"] = file_name_cut - folder_name = file_path.split("\\")[-1].split("_")[-2] - df["folder_name"] = folder_name - - if "Time Stamp" in df.columns: - df["Time Stamp"] = df["Time Stamp"].map_partitions( - pd.to_datetime, meta=("Time Stamp", "datetime64[ns]") - ) - - df["delta_sec"] = df.map_partitions( - calculate_delta_sec, meta=("delta_sec", "float64") - ) - df["calculated_time"] = df["t0"] + dd.to_timedelta( - df["delta_sec"], unit="s" - ) - df["file_datetime"] = df.apply( - extract_datetime, axis=1, meta=("file_datetime", "datetime64[ns]") - ) - df["date_floored"] = df["calculated_time"].dt.floor("h") - df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]") - df["hour"] = df["calculated_time"].dt.hour.astype("i8") - df["floor_time"] = df["calculated_time"].dt.floor("s") - df["Secs_2GB"] = df["Time (sec)"].apply( - np.floor, meta=("Secs_2GB", "i8") - ) - - fn = ( - file_path.split("\\")[-1].split("_")[-2] - + "_" - + file_path.split("\\")[-1].split("_")[-1].split(".")[-2] - ) - - def name(part_idx): - return f"{fn}.parquet" - - df = df.set_index("calculated_time", drop=True, sort=False, sorted=True) - - df.to_parquet( - path=target_directory, - engine="pyarrow", - partition_on=["date", "hour"], - coerce_timestamps="us", - allow_truncated_timestamps=True, - name_function=name, - write_index=True, - append=False, - ) - return df - else: - warnings.warn("tmp_hk empty or not existing") - return pd.DataFrame() + elif "hk" in file_path: + file_schema = schema["hk_schema"] + postprocess = postprocess_hk else: - raise ValueError("No CSV files found.") + raise ValueError(f"Unrecognized file type: {file_path}") + + try: + df = read_sp2xr_csv(file_path, file_schema) + df = postprocess(df) + """except zipfile.BadZipFile: + logger.warning(f"[{file_path}] Bad zip file: {e}") + df = pd.DataFrame() + except pd.errors.EmptyDataError: + logger.warning(f"[{file_path}] Empty CSV") + df = pd.DataFrame() + except ValueError as e: + logger.error(f"[{file_path}] ValueError: {e}") + if "Missing column provided to 'parse_dates'" in str(e): + print(f"Error for {file_path}: Missing 'Time Stamp'") + else: + print(f"ValueError for {file_path}: {e}") + df = pd.DataFrame()""" + except Exception as e: + logger.error(f"[{file_path}] Unexpected error: {e}") + df = pd.DataFrame() + + if len(df.columns) > 0: + df = df.loc[~df.isna().all(axis=1)] + df = enrich_sp2xr_dataframe(df, file_path, first_val, t0) + save_sp2xr_parquet(df, file_path, target_directory) + return df + else: + return pd.DataFrame() diff --git a/tests/test_io_real.py b/tests/test_io_real.py index 2f2f613..c36e30e 100644 --- a/tests/test_io_real.py +++ b/tests/test_io_real.py @@ -1,5 +1,7 @@ from pathlib import Path -from sp2xr.io import read_csv_files_with_dask +from sp2xr.io import process_sp2xr_file + +# from sp2xr.io import read_csv_files_with_dask DATA = Path(__file__).parent / "data" @@ -8,7 +10,7 @@ def test_read_real_pbp_zip(tmp_path): mini_zip = DATA / "mini_SP2XR_PbP_20190409110737_x0001.zip" config_file = DATA / "config.yaml" - df = read_csv_files_with_dask( + df = process_sp2xr_file( file_path=str(mini_zip), config_path=str(config_file), target_directory=str(tmp_path / "pq_out"), @@ -24,7 +26,7 @@ def test_read_real_hk_zip(tmp_path): mini_zip = DATA / "mini_SP2XR_hk_20190409110737_x0001.zip" config_file = DATA / "config.yaml" - df = read_csv_files_with_dask( + df = process_sp2xr_file( file_path=str(mini_zip), config_path=str(config_file), target_directory=str(tmp_path / "pq_out"),