diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 9c4e325..f7525c7 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -213,7 +213,7 @@ def main(): "Sample Flow Controller Read (vccm)": pd.Series( dtype="float64" ), - "date": pd.Series(dtype="datetime64[ns]"), + "date": pd.Series(dtype="str"), "hour": pd.Series(dtype="int64"), }, index=pd.DatetimeIndex([]), @@ -340,7 +340,7 @@ def main(): delete_partition_if_exists( output_path=f"{run_config['output']}/pbp_calibrated", partition_values={ - "date": chunk_start.strftime("%Y-%m-%d 00:00:00"), + "date": chunk_start.strftime("%Y-%m-%d"), "hour": chunk_start.hour, }, ) @@ -411,9 +411,8 @@ def main(): ) # 2) cast partition columns *before* Dask strips them off - ddf_conc["date"] = dd.to_datetime(ddf_conc["date"]).astype( - "datetime64[ns]" - ) + # Keep date as string to avoid Windows path issues with datetime partitions + ddf_conc["date"] = ddf_conc["date"].astype("str") ddf_conc["hour"] = ddf_conc["hour"].astype("int64") conc_future = ddf_conc.to_parquet( diff --git a/src/sp2xr/calibration.py b/src/sp2xr/calibration.py index 31b18de..7b377c2 100644 --- a/src/sp2xr/calibration.py +++ b/src/sp2xr/calibration.py @@ -182,7 +182,9 @@ def calibrate_particle_data( curve_type=scatt_conf.get("curve_type"), params=scatt_conf.get("parameters", []), ) - df["date"] = df.index.date.astype("datetime64[ns]") + # Use string format for date to avoid Windows path issues with datetime partitions + # PyArrow creates partition dirs like "date=2019-05-08 00:00:00" which has colons (invalid on Windows) + df["date"] = df.index.strftime("%Y-%m-%d") df["hour"] = df.index.hour.astype("int8") return df @@ -218,8 +220,7 @@ def calibrate_single_particle( meta_cal = ddf_raw._meta.copy() meta_cal["BC mass"] = pd.Series([], dtype="float64") meta_cal["Opt diam"] = pd.Series([], dtype="float64") - meta_cal["time_lag"] = pd.Series([], dtype="int8") - meta_cal["date"] = pd.Series([], dtype="datetime64[ns]") + meta_cal["date"] = pd.Series([], dtype="str") meta_cal["hour"] = pd.Series([], dtype="int8") ddf_cal = ddf_raw.map_partitions( diff --git a/src/sp2xr/distribution.py b/src/sp2xr/distribution.py index c017fb6..5d5b787 100644 --- a/src/sp2xr/distribution.py +++ b/src/sp2xr/distribution.py @@ -427,7 +427,8 @@ def process_histograms( # merged_ddf["date"] = dd.to_datetime(merged_ddf.index.to_series()).dt.normalize() # merged_ddf["hour"] = merged_ddf["hour"].astype("int64") time_index = dd.to_datetime(merged_ddf.index.to_series()) - merged_ddf["date"] = time_index.dt.normalize() # works on Series + # Use string format to avoid Windows path issues with datetime partitions + merged_ddf["date"] = time_index.dt.strftime("%Y-%m-%d") merged_ddf["hour"] = time_index.dt.hour.astype("int64") # --- Save hists to parquet diff --git a/src/sp2xr/resample_pbp_hk.py b/src/sp2xr/resample_pbp_hk.py index 62bfba6..c8c71bd 100644 --- a/src/sp2xr/resample_pbp_hk.py +++ b/src/sp2xr/resample_pbp_hk.py @@ -92,7 +92,8 @@ def build_dt_summary(pdf: pd.DataFrame, dt_s: int = 1) -> pd.DataFrame: out = out[out["original_idx"] != 0].drop(columns="original_idx") # add date / hour helper cols - out["date"] = out.index.normalize() + # Use string format to avoid Windows path issues with datetime partitions + out["date"] = out.index.strftime("%Y-%m-%d") out["hour"] = out.index.hour.astype("int64") return out @@ -130,7 +131,8 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame: out = out.asfreq(dt).ffill(limit=1) # --- add date/hour columns --- - out["date"] = out.index.normalize() + # Use string format to avoid Windows path issues with datetime partitions + out["date"] = out.index.strftime("%Y-%m-%d") out["hour"] = out.index.hour.astype("int64") return out @@ -176,7 +178,8 @@ def aggregate_dt(ddf_pbp_dt, ddf_hk_dt, run_config): ) time_index = dd.to_datetime(ddf_pbp_hk_dt.index.to_series()) - ddf_pbp_hk_dt["date"] = time_index.dt.normalize() # works on Series + # Use string format to avoid Windows path issues with datetime partitions + ddf_pbp_hk_dt["date"] = time_index.dt.strftime("%Y-%m-%d") ddf_pbp_hk_dt["hour"] = time_index.dt.hour.astype("int64") # Optionally drop the old columns diff --git a/src/sp2xr/schema.py b/src/sp2xr/schema.py index 48f77d0..3c27640 100644 --- a/src/sp2xr/schema.py +++ b/src/sp2xr/schema.py @@ -6,7 +6,8 @@ from typing import Mapping # schema.py ── central truth table ─────────────────────────────────────────── CANONICAL_DTYPES = { # identifiers / file bookkeeping - "date": "datetime64[ns]", + # Use string for date to avoid Windows path issues with datetime partitions + "date": "string[pyarrow]", "hour": pd.Int64Dtype(), "file": "string[pyarrow]", "folder_name": "string[pyarrow]",