feat: modernize all type

This commit is contained in:
2025-09-11 14:49:48 +02:00
parent a2df98042c
commit 0a71ca614c
6 changed files with 36 additions and 22 deletions

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import pandas as pd

View File

@@ -1,8 +1,7 @@
from __future__ import annotations
import numpy as np
import pandas as pd
from typing import Tuple
from typing import Optional
import gc
import dask.dataframe as dd
from dask.dataframe.utils import make_meta
@@ -17,7 +16,7 @@ def make_bin_arrays(
ctrs: np.ndarray | None = None,
n: int = 50,
log: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
) -> tuple[np.ndarray, np.ndarray]:
if lims is None and ctrs is None:
lims = (
np.logspace(np.log10(0.3), np.log10(400), n)
@@ -97,17 +96,17 @@ def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400
def process_hist_and_dist_partition(
df: pd.DataFrame,
col: str,
flag_col: Optional[str],
flag_value: Optional[int],
flag_col: str | None,
flag_value: int | None,
bin_lims: np.ndarray,
bin_ctrs: np.ndarray,
dt,
calculate_conc: bool = True,
flow=None, # kept for API compatibility; not used
rho_eff: Optional[float] = 1800,
BC_type: Optional[str] = "",
rho_eff: float = 1800,
BC_type: str = "",
# t: float = 1.0,
name_prefix: Optional[str] = None, # <-- NEW: force exact output name prefix
name_prefix: str | None = None, # <-- NEW: force exact output name prefix
):
# normalize dt -> "Xs"
dt_str = f"{dt}s" if isinstance(dt, (int, float)) else str(dt)

View File

@@ -1,8 +1,13 @@
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
def define_flags(pdf_pbp, instr_config, run_config):
def define_flags(
pdf_pbp: pd.DataFrame, instr_config: dict[str, Any], run_config: dict[str, Any]
) -> pd.DataFrame:
flag_inc_transit_time = (
pdf_pbp["Incand Transit Time"]
>= instr_config["instrument_parameters"]["IncTransitMin"]
@@ -71,7 +76,7 @@ def define_flags(pdf_pbp, instr_config, run_config):
return pdf_pbp
FLAG_COLS = [
FLAG_COLS: list[str] = [
"flag_valid_scatt_signal",
"flag_scatt_not_sat",
"flag_valid_inc_signal_in_range",

View File

@@ -10,7 +10,6 @@ from dask_jobqueue import SLURMCluster
import multiprocessing
import psutil
from dask.distributed import Client, LocalCluster
from typing import Optional, List
import shutil
@@ -295,7 +294,7 @@ def choose(cli_val, cfg, dotted, default=None):
return cli_val if cli_val is not None else get(cfg, dotted, default)
def find_matching_hk_file(pbp_path: str) -> Optional[str]:
def find_matching_hk_file(pbp_path: str) -> str | None:
"""
From a PbP filename, return a path to the corresponding hk file (.csv or .zip), if found.
"""
@@ -321,7 +320,7 @@ def extract_base_filename(file_path: str) -> str:
return Path(file_path).stem
def find_files(directory: str, pattern: str) -> List[str]:
def find_files(directory: str, pattern: str) -> list[str]:
"""
Recursively find all files in `directory` containing `pattern`.
"""
@@ -333,7 +332,7 @@ def find_files(directory: str, pattern: str) -> List[str]:
return sorted(matches)
def chunks(lst: List[str], n: int):
def chunks(lst: list[str], n: int):
"""
Yield successive `n`-sized chunks from list `lst`.
"""

View File

@@ -1,3 +1,6 @@
from __future__ import annotations
from typing import Any
import pandas as pd
from pathlib import Path
import os
@@ -15,7 +18,9 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> None:
def csv_to_parquet(
csv_path: Path | str, parquet_path: Path | str, **read_csv_kwargs: Any
) -> None:
"""
Read a CSV file, return an identical Parquet file.
@@ -35,7 +40,7 @@ def csv_to_parquet(csv_path: Path, parquet_path: Path, **read_csv_kwargs) -> Non
df.to_parquet(parquet_path, index=False)
def read_sp2xr_csv(file_path: str, schema: dict) -> dd.DataFrame:
def read_sp2xr_csv(file_path: str, schema: dict[str, Any]) -> dd.DataFrame:
"""
Read a SP2XR CSV/ZIP file with given schema from config.
@@ -54,7 +59,7 @@ def read_sp2xr_csv(file_path: str, schema: dict) -> dd.DataFrame:
return df
def load_matching_hk_file(file_path):
def load_matching_hk_file(file_path: str | None) -> pd.DataFrame:
expected_columns = ["Time Stamp", "Time (sec)"]
if not file_path:

View File

@@ -1,11 +1,13 @@
from typing import List
from __future__ import annotations
from typing import Any
import pandas as pd
import numpy as np
import dask.dataframe as dd
from sp2xr.helpers import floor_index_to_dt
_SUM_COLS: List[str] = [
_SUM_COLS: list[str] = [
"Dropped Records",
"Incand Mass (fg)",
"BC mass",
@@ -24,7 +26,7 @@ _SUM_COLS: List[str] = [
"cnts_unclassified",
]
_COUNT_SRC: List[str] = [
_COUNT_SRC: list[str] = [
"Incand Mass (fg)",
"BC mass",
"BC mass within range",
@@ -34,7 +36,7 @@ _COUNT_SRC: List[str] = [
"temporary_col",
]
_COUNT_DST: List[str] = [
_COUNT_DST: list[str] = [
"BC numb from file",
"BC numb",
"BC numb within range",
@@ -134,7 +136,9 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame:
return out
def join_pbp_with_flow(ddf_cal, flow_dt, run_config):
def join_pbp_with_flow(
ddf_cal: dd.DataFrame, flow_dt: dd.DataFrame, run_config: dict[str, Any]
) -> dd.DataFrame:
inc_mass_bin_lims = np.logspace(
np.log10(run_config["histo"]["inc"]["min_mass"]),
np.log10(run_config["histo"]["inc"]["max_mass"]),