Files
SP2XR/src/sp2xr/io.py

228 lines
8.7 KiB
Python

import pandas as pd
from pathlib import Path
import os
import re
import zipfile
import warnings
import yaml
import numpy as np
import dask.dataframe as dd
from .toolkit_legacy import calculate_delta_sec, extract_datetime
def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> None:
"""
Read a CSV file, return an identical Parquet file.
Parameters
----------
csv_path : Path | str
Location of the source CSV.
parquet_path : Path | str
Destination Parquet path. Parent dirs are created automatically.
read_csv_kwargs : dict
Extra kwargs forwarded to pandas.read_csv().
"""
csv_path = Path(csv_path)
parquet_path = Path(parquet_path)
df = pd.read_csv(csv_path, **read_csv_kwargs)
parquet_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(parquet_path, index=False)
def read_csv_files_with_dask(file_path, config_path, target_directory):
"""
This function reads Pbp or HK files from the SP2XR
Parameters
----------
file_path : str
Complete path of the file to read.
meta : pandas DataFrame
Empty pandas dataframe with the structure expected for the file that is read.
This is ised in case the file is empty --> The function will return an empty DataFrame
with this structure.
Returns
-------
Dask DataFrame
Content of the file as Dask DataFrame.
"""
if config_path:
with open(config_path) as f:
schema = yaml.safe_load(f)
pbp_schema = schema["pbp_schema"]
hk_schema = schema["hk_schema"]
else:
raise ValueError("No config file found.")
if file_path:
tmp_hk = pd.DataFrame()
hk_0001 = re.sub(r"PbP", "hk", file_path)
hk_0001 = re.sub(r"(_x)\d{4}", r"\g<1>0001", hk_0001)
hk_0001 = re.sub(r"\.(csv|zip)$", "", hk_0001)
if os.path.exists(f"{hk_0001}.csv"):
try:
tmp_hk = pd.read_csv(
f"{hk_0001}.csv",
nrows=1,
parse_dates=["Time Stamp"],
usecols=["Time Stamp", "Time (sec)"],
)
except pd.errors.EmptyDataError:
tmp_hk = pd.DataFrame()
except zipfile.BadZipFile:
print(f"!! Bad file: {file_path}")
tmp_hk = pd.DataFrame()
elif os.path.exists(f"{hk_0001}.zip"):
try:
tmp_hk = pd.read_csv(
f"{hk_0001}.zip",
nrows=1,
parse_dates=["Time Stamp"],
usecols=["Time Stamp", "Time (sec)"],
)
except pd.errors.EmptyDataError:
tmp_hk = pd.DataFrame()
if not tmp_hk.empty:
first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0]
if "PbP" in file_path:
"""
temp = pbp_schema
data_type = {
col: (
"float64"
if typ == "float"
else "int64" if typ == "int" else "string"
) # default fallback
for col, typ in pbp_schema.items()
if typ != "datetime"
}
parse_dates = [
col for col, typ in pbp_schema.items() if typ == "datetime"
]
"""
try:
df = dd.read_csv(
file_path,
dtype=pbp_schema, # data_type,
# parse_dates=parse_dates,
blocksize=None,
)
df = df.fillna(
0
) # is this because otherwise we cannot calculate the time_lag?
# df['time_lag'] = df['Incand Peak Time'] - df['Scatter Peak Time'] # 02.09.2024 this line implies that for particles with nan in the scatt transit time time_lag=incand transit time. better to calculate timelag for particles with both scatt and incand and set 0 for particles with only incand
#!!! MISSING CORRECT TIME LAG CALCULATIONS
except zipfile.BadZipFile:
print(f"!! Bad zip file: {file_path}")
df = pd.DataFrame()
return df
elif "hk" in file_path:
datetime_cols = [
col for col, typ in hk_schema.items() if typ == "datetime"
]
dtype_cols = {
col: typ for col, typ in hk_schema.items() if typ != "datetime"
}
try:
df = dd.read_csv(
file_path,
dtype=dtype_cols, # filtered_dtype_dict,
parse_dates=datetime_cols,
blocksize=None,
assume_missing=True,
)
# df = dd.read_csv(file_path, dtype=data_type, parse_dates=['Time Stamp'], blocksize=None)#, assume_missing=True)
"""if 'Time Stamp' in df.columns:
datetime_format = '%m/%d/%Y %H:%M:%S.%f'
df['Time Stamp'] = df['Time Stamp'].map_partitions(pd.to_datetime, format=datetime_format, meta=('Time Stamp', 'datetime64[ns]'))
"""
except ValueError as e:
# Handle the error if the 'Time Stamp' column is missing or any other parsing error occurs
if "Missing column provided to 'parse_dates'" in str(e):
print(
f"Error for {file_path}: Missing column provided to 'parse_dates': 'Time Stamp'"
)
df = pd.DataFrame()
return df
except pd.errors.EmptyDataError:
df = pd.DataFrame()
return df
except zipfile.BadZipFile:
print(f"!! Bad zip file: {file_path}")
df = pd.DataFrame()
return df
if len(df.columns) > 0:
df = df.loc[~df.isna().all(axis=1)]
df["path"] = str(file_path)
df["first_val"] = first_val
df["t0"] = t0
file_name_cut = (
file_path.split("\\")[-1].split("_")[-2]
+ "_"
+ file_path.split("\\")[-1].split("_")[-1].split(".")[-2]
)
df["file"] = file_name_cut
folder_name = file_path.split("\\")[-1].split("_")[-2]
df["folder_name"] = folder_name
if "Time Stamp" in df.columns:
df["Time Stamp"] = df["Time Stamp"].map_partitions(
pd.to_datetime, meta=("Time Stamp", "datetime64[ns]")
)
df["delta_sec"] = df.map_partitions(
calculate_delta_sec, meta=("delta_sec", "float64")
)
df["calculated_time"] = df["t0"] + dd.to_timedelta(
df["delta_sec"], unit="s"
)
df["file_datetime"] = df.apply(
extract_datetime, axis=1, meta=("file_datetime", "datetime64[ns]")
)
df["date_floored"] = df["calculated_time"].dt.floor("h")
df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]")
df["hour"] = df["calculated_time"].dt.hour.astype("i8")
df["floor_time"] = df["calculated_time"].dt.floor("s")
df["Secs_2GB"] = df["Time (sec)"].apply(
np.floor, meta=("Secs_2GB", "i8")
)
fn = (
file_path.split("\\")[-1].split("_")[-2]
+ "_"
+ file_path.split("\\")[-1].split("_")[-1].split(".")[-2]
)
def name(part_idx):
return f"{fn}.parquet"
df = df.set_index("calculated_time", drop=True, sort=False, sorted=True)
df.to_parquet(
path=target_directory,
engine="pyarrow",
partition_on=["date", "hour"],
coerce_timestamps="us",
allow_truncated_timestamps=True,
name_function=name,
write_index=True,
append=False,
)
return df
else:
warnings.warn("tmp_hk empty or not existing")
return pd.DataFrame()
else:
raise ValueError("No CSV files found.")