diff --git a/src/sp2xr/io.py b/src/sp2xr/io.py index 3b108c6..bb8e100 100644 --- a/src/sp2xr/io.py +++ b/src/sp2xr/io.py @@ -4,6 +4,7 @@ import os import re import zipfile import warnings +import yaml import numpy as np import dask.dataframe as dd @@ -30,7 +31,7 @@ def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> Non df.to_parquet(parquet_path, index=False) -def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory): +def read_csv_files_with_dask(file_path, config_path, target_directory): """ This function reads Pbp or HK files from the SP2XR @@ -49,6 +50,15 @@ def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory): Content of the file as Dask DataFrame. """ + if config_path: + with open(config_path) as f: + schema = yaml.safe_load(f) + + pbp_schema = schema["pbp_schema"] + hk_schema = schema["hk_schema"] + else: + raise ValueError("No config file found.") + if file_path: tmp_hk = pd.DataFrame() @@ -82,12 +92,28 @@ def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory): if not tmp_hk.empty: first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0] if "PbP" in file_path: - temp = meta_pbp - data_type = pd.Series(temp.dtypes.values, index=temp.columns).to_dict() + """ + temp = pbp_schema + data_type = { + col: ( + "float64" + if typ == "float" + else "int64" if typ == "int" else "string" + ) # default fallback + for col, typ in pbp_schema.items() + if typ != "datetime" + } + parse_dates = [ + col for col, typ in pbp_schema.items() if typ == "datetime" + ] + """ try: df = dd.read_csv( - file_path, dtype=data_type, blocksize=None - ) # , include_path_column=True) + file_path, + dtype=pbp_schema, # data_type, + # parse_dates=parse_dates, + blocksize=None, + ) df = df.fillna( 0 ) # is this because otherwise we cannot calculate the time_lag? @@ -99,18 +125,17 @@ def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory): return df elif "hk" in file_path: - temp = meta_hk - data_type = pd.Series(temp.dtypes.values, index=temp.columns).to_dict() - filtered_dtype_dict = { - key: value - for key, value in data_type.items() - if value != "datetime64[ns]" + datetime_cols = [ + col for col, typ in hk_schema.items() if typ == "datetime" + ] + dtype_cols = { + col: typ for col, typ in hk_schema.items() if typ != "datetime" } try: df = dd.read_csv( file_path, - dtype=filtered_dtype_dict, - parse_dates=["Time Stamp"], + dtype=dtype_cols, # filtered_dtype_dict, + parse_dates=datetime_cols, blocksize=None, assume_missing=True, ) @@ -164,10 +189,10 @@ def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory): df["file_datetime"] = df.apply( extract_datetime, axis=1, meta=("file_datetime", "datetime64[ns]") ) - df["date_floored"] = df["calculated_time"].dt.floor("H") + df["date_floored"] = df["calculated_time"].dt.floor("h") df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]") df["hour"] = df["calculated_time"].dt.hour.astype("i8") - df["floor_time"] = df["calculated_time"].dt.floor("S") + df["floor_time"] = df["calculated_time"].dt.floor("s") df["Secs_2GB"] = df["Time (sec)"].apply( np.floor, meta=("Secs_2GB", "i8") ) @@ -200,6 +225,3 @@ def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory): else: raise ValueError("No CSV files found.") - - -# TEMP:test commit separation diff --git a/tests/data/config.yaml b/tests/data/config.yaml new file mode 100644 index 0000000..701e851 --- /dev/null +++ b/tests/data/config.yaml @@ -0,0 +1,159 @@ +pbp_schema: + Time (sec): float + Packet Time Stamp: float + Flag: float + Dropped Records: float + Record Count: float + Record Size: float + Particle Time Stamp: float + Particle Flags: float + Scatter relPeak: float + Scatter Transit Time: float + Scatter Peak Time: float + Scatter FWHM: float + Scatter Size (nm): float + Incand relPeak: float + Incand Transit Time: float + Incand Peak Time: float + Incand FWHM: float + Incand Delay: float + Incand Mass (fg): float + Reserved: float +hk_schema: + Time Stamp: datetime + Time (sec): float + Time Stamp (UTC sec): float + Elapsed Time: float + Error Code: float + Packet Time Stamp: float + Laser TEC Temp (C): float + Crystal TEC Temp (C): float + Inlet Air Temp (C): float + Computer Heatsink Temp (C): float + Laser Heatsink Temp (C): float + Outlet Air Temp (C): float + YAG Output Monitor (V): float + Cavity Pressure (hPa): float + Laser Driver Power Monitor (uA): float + Laser Driver Current Limit Monitor (A): float + Laser Driver Current Monitor (A): float + Laser TEC Sense: float + Laser Over Temp (On/Off): float + +5V Laser Rail (V): float + ' +5V Rail (V)': float + +12V Rail (V): float + High Voltage (V): float + Battery Temp (C): float + UPS Output (V): float + 12V Iso Rail (V): float + 5V Iso Rail (V): float + 3.3V Iso Rail (V): float + Spare 22: float + Spare 23: float + 408 Board Spare 0: float + 408 Board Spare 1: float + 408 Board Spare 2: float + 408 Board Spare 3: float + 408 Board Spare 4: float + Purge Flow Monitor (sccm): float + System Input Voltage (V): float + Board Temperature (C): float + 408 Board Spare 8: float + 408 Board Spare 9: float + 408 Board Spare 10: float + 408 Board Spare 11: float + 408 Board Spare 12: float + 408 Board Spare 13: float + 408 Board Spare 14: float + 408 Board Spare 15: float + Sheath Flow Controller Read (vccm): float + Sheath Flow Controller Read (sccm): float + Sheath Flow Controller Pressure (psia): float + Sheath Flow Controller Temperature (C): float + Sample Flow Controller Read (vccm): float + Sample Flow Controller Read (sccm): float + Sample Flow Controller Pressure (psia): float + Sample Flow Controller Temperature (C): float + Fan 1 (RPM): float + Fan 2 (RPM): float + Laser Fan (RPM): float + Spare tach: float + Threshold Crossing Events: float + Dual Qualified Scatter and Incand Particles: float + Qualified Scatter Only Particles: float + Qualified Incand Only Particles: float + Disqualified Due to Scatter Saturation: float + Disqualified Due to Scatter Transit Time Min: float + Disqualified Due to Scatter Transit Time Max: float + Disqualified Due to Scatter FWHM Min: float + Disqualified Due to Scatter FWHM Max: float + Scatter Inter Part Period Min Violation: float + Disqualified Due to Incand Saturation: float + Disqualified Due to Incand Transit Time Min: float + Disqualified Due to Incand Transit Time Max: float + Disqualified Due to Incand FWHM Min: float + Disqualified Due to Incand FWHM Max: float + Incand Inter Part Period Min Violation: float + Baseline Sizer Lo: float + Baseline Sizer Hi: float + Baseline Incand Lo: float + Baseline Incand Hi: float + Bandwidth Sizer Hi: float + Bandwidth Sizer Lo: float + Bandwidth Incand Lo: float + Bandwidth Incand Hi: float + ABD-0408 HK ADCs min: float + ABD-0436 HK ADCs min: float + ABD-0408 HK ADCs max: float + ABD-0436 HK ADCs max: float + Incand Particle Conc (cts/ccm): float + Scattering Particle Conc (cts/ccm): float + Incand Mass Conc (fg/sccm): float + Scattering Mass Conc (fg/sccm): float + Sheath Flow Set Point: float + Sample Flow Set Point: float + Laser Temp Set Point: float + Laser Current Set Point: float + Spare 4 Set Point: float + Spare 5 Set Point: float + PMT HV Set Point: float + Particle Density (g/ccm): float + PbP Packet Time: float + Scatter Bin 1: float + Scatter Bin 2: float + Scatter Bin 3: float + Scatter Bin 4: float + Scatter Bin 5: float + Scatter Bin 6: float + Scatter Bin 7: float + Scatter Bin 8: float + Scatter Bin 9: float + Scatter Bin 10: float + Scatter Bin 11: float + Scatter Bin 12: float + Scatter Bin 13: float + Scatter Bin 14: float + Scatter Bin 15: float + Scatter Bin 16: float + Scatter Bin 17: float + Scatter Bin 18: float + Scatter Bin 19: float + Incand Bin 1: float + Incand Bin 2: float + Incand Bin 3: float + Incand Bin 4: float + Incand Bin 5: float + Incand Bin 6: float + Incand Bin 7: float + Incand Bin 8: float + Incand Bin 9: float + Incand Bin 10: float + Incand Bin 11: float + Incand Bin 12: float + Incand Bin 13: float + Incand Bin 14: float + Incand Bin 15: float + Incand Bin 16: float + Incand Bin 17: float + Incand Bin 18: float + Incand Bin 19: float diff --git a/tests/data/mini_SP2XR_PbP_20190409110737_x0001.zip b/tests/data/mini_SP2XR_PbP_20190409110737_x0001.zip new file mode 100644 index 0000000..7634d40 Binary files /dev/null and b/tests/data/mini_SP2XR_PbP_20190409110737_x0001.zip differ diff --git a/tests/data/mini_SP2XR_hk_20190409110737_x0001.zip b/tests/data/mini_SP2XR_hk_20190409110737_x0001.zip new file mode 100644 index 0000000..608d7c0 Binary files /dev/null and b/tests/data/mini_SP2XR_hk_20190409110737_x0001.zip differ diff --git a/tests/test_io_real.py b/tests/test_io_real.py new file mode 100644 index 0000000..2f2f613 --- /dev/null +++ b/tests/test_io_real.py @@ -0,0 +1,34 @@ +from pathlib import Path +from sp2xr.io import read_csv_files_with_dask + +DATA = Path(__file__).parent / "data" + + +def test_read_real_pbp_zip(tmp_path): + mini_zip = DATA / "mini_SP2XR_PbP_20190409110737_x0001.zip" + config_file = DATA / "config.yaml" + + df = read_csv_files_with_dask( + file_path=str(mini_zip), + config_path=str(config_file), + target_directory=str(tmp_path / "pq_out"), + ) + + # 50 lines in file = 50 rows returned + assert len(df) == 50 + # parquet really written + assert list((tmp_path / "pq_out").rglob("*.parquet")), "No parquet output" + + +def test_read_real_hk_zip(tmp_path): + mini_zip = DATA / "mini_SP2XR_hk_20190409110737_x0001.zip" + config_file = DATA / "config.yaml" + + df = read_csv_files_with_dask( + file_path=str(mini_zip), + config_path=str(config_file), + target_directory=str(tmp_path / "pq_out"), + ) + + assert len(df) == 50 # or assert exact number if known + assert list((tmp_path / "pq_out").rglob("*.parquet")), "No parquet output" diff --git a/tests/test_roundtrip.py b/tests/test_roundtrip.py new file mode 100644 index 0000000..4ef8818 --- /dev/null +++ b/tests/test_roundtrip.py @@ -0,0 +1,23 @@ +import pandas as pd +from sp2xr import csv_to_parquet + + +def test_csv_to_parquet_roundtrip(tmp_path): + # --- create synthetic mini‑dataset --- + original = pd.DataFrame( + { + "particle_id": [1, 2, 3], + "incand": [123.4, 234.5, 345.6], + "scat": [10.1, 11.2, 12.3], + } + ) + csv_file = tmp_path / "sample.csv" + pq_file = tmp_path / "sample.parquet" + original.to_csv(csv_file, index=False) + + # --- run the code under test --- + csv_to_parquet(csv_file, pq_file) + + # --- validate --- + roundtrip = pd.read_parquet(pq_file) + pd.testing.assert_frame_equal(roundtrip, original)