From 6ab90b1a564d60bc84e2c934b5ca7832814ceccb Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Wed, 19 Nov 2025 17:00:07 +0100 Subject: [PATCH] New handling of conversion from CSV/ZIP to Parquet via config file --- config/conversion_config_template.yaml | 62 ++++++++++++ scripts/sp2xr_csv2parquet.py | 135 +++++++++++++------------ 2 files changed, 133 insertions(+), 64 deletions(-) create mode 100644 config/conversion_config_template.yaml diff --git a/config/conversion_config_template.yaml b/config/conversion_config_template.yaml new file mode 100644 index 0000000..9bf7b0f --- /dev/null +++ b/config/conversion_config_template.yaml @@ -0,0 +1,62 @@ +# SP2XR CSV/ZIP to Parquet Conversion Configuration Template +# +# This file contains all parameters for converting raw SP2XR data files +# to time-indexed Parquet format. +# +# USAGE: +# 1. Update the paths and parameters below for your dataset +# 2. Run: python scripts/sp2xr_csv2parquet.py --config my_conversion_config.yaml +# +# NOTES: +# - For local execution, the script auto-detects available CPU cores and memory +# - Output files are organized by date and hour: target_directory/date=YYYY-MM-DD/hour=HH/ +# - Processing is parallelized using Dask for efficient handling of large datasets +# - You can monitor progress via the Dask dashboard (URL printed when script starts) + + +# Directory containing your raw SP2XR files (CSV or ZIP format) +source_directory: data/SP2XR_orig_files + +# Output directory for converted Parquet files +target_directory: data/pbp_files_parquet_2 + +# Path to your data schema config file (generated by sp2xr_generate_config.py) +schema_config: config/config_schema_with_mapping.yaml + + +# Pattern to filter which files to process +# "PbP" for particle-by-particle data, "hk" for housekeeping data +file_filter: PbP + + +# Number of files to process in each batch +# Larger values = more memory usage but potentially faster +# Smaller values = less memory usage but more overhead +chunk_size: 100 + + +# Execution mode: "local" or "slurm" +# - local: Use your local machine (laptop/desktop) +# - slurm: Use a SLURM cluster (HPC environment) +execution_mode: local + + + +# --- SLURM-specific parameters (ignored if execution_mode: local) --- + +# Number of CPU cores per SLURM job +slurm_cores: 64 + +# Memory per SLURM job (e.g., "128GB", "256GB") +slurm_memory: 128GB + +# SLURM partition to use +# Common values: "hourly", "daily", "general" +slurm_partition: daily + +# Walltime for SLURM job (e.g., "01:00:00" for 1 hour, "23:59:00" for 1 day) +# If not specified, defaults based on partition: +# - hourly: 00:59:00 +# - daily: 23:59:00 +# - general: 7-00:00:00 +slurm_walltime: null \ No newline at end of file diff --git a/scripts/sp2xr_csv2parquet.py b/scripts/sp2xr_csv2parquet.py index df593eb..8302e35 100644 --- a/scripts/sp2xr_csv2parquet.py +++ b/scripts/sp2xr_csv2parquet.py @@ -4,6 +4,8 @@ import gc import dask import multiprocessing import psutil +import yaml +from pathlib import Path from dask.distributed import Client, LocalCluster from dask_jobqueue import SLURMCluster from dask import delayed @@ -13,58 +15,63 @@ from sp2xr.helpers import find_files, chunks def main(): parser = argparse.ArgumentParser( - description="Batch convert SP2XR zip to parquet using Dask (local or SLURM cluster)" + description="Batch convert SP2XR zip to parquet using Dask (local or SLURM cluster)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Example: + python sp2xr_csv2parquet.py --config my_conversion_config.yaml + +See config/conversion_config_template.yaml for configuration options. + """, ) + parser.add_argument( - "--source", required=True, help="Directory containing input zip/CSV files" - ) - parser.add_argument( - "--target", required=True, help="Output directory for parquet files" - ) - parser.add_argument( - "--config", required=True, help="Path to YAML schema config file" - ) - parser.add_argument( - "--filter", default="PbP", help="Pattern to filter files (default: PbP)" - ) - parser.add_argument( - "--chunk", - type=int, - default=100, - help="Number of files per batch (default: 100)", - ) - parser.add_argument( - "--local", - action="store_true", - help="Use local Dask client (auto-detects resources) instead of SLURM cluster", - ) - parser.add_argument( - "--cores", - type=int, - default=64, - help="Cores per SLURM job (ignored for --local, default: 64)", - ) - parser.add_argument( - "--memory", - default="128GB", - help="Memory per SLURM job (ignored for --local, default: 128GB)", - ) - parser.add_argument( - "--walltime", - default=None, - help="Walltime for SLURM (ignored for --local, default depends on partition)", - ) - parser.add_argument( - "--partition", - default="daily", - help="SLURM partition (ignored for --local, default: daily)", + "--config", + required=True, + help="Path to conversion config YAML file", ) + args = parser.parse_args() + # Load config from file + config_path = Path(args.config) + if not config_path.exists(): + print(f"Error: Config file not found: {config_path}") + return 1 + + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + print(f"Loaded conversion config from: {config_path}") + + # Extract parameters from config + source = config.get("source_directory") + target = config.get("target_directory") + schema_config = config.get("schema_config") + file_filter = config.get("file_filter", "PbP") + chunk_size = config.get("chunk_size", 100) + execution_mode = config.get("execution_mode", "local") + + # SLURM parameters + slurm_cores = config.get("slurm_cores", 64) + slurm_memory = config.get("slurm_memory", "128GB") + slurm_partition = config.get("slurm_partition", "daily") + slurm_walltime = config.get("slurm_walltime") + + # Validate required parameters + if not source or not target or not schema_config: + print("Error: Missing required parameters in config file:") + print(" - source_directory") + print(" - target_directory") + print(" - schema_config") + return 1 + + use_local = execution_mode == "local" + # --- Setup cluster (local or SLURM) --- start_time = time.time() - if args.local: + if use_local: # Local execution: auto-detect resources total_cores = multiprocessing.cpu_count() total_memory = psutil.virtual_memory().total # in bytes @@ -83,26 +90,26 @@ def main(): print(f"Dask dashboard: {client.dashboard_link}") else: # SLURM execution - if args.walltime is None: - if args.partition == "hourly": - args.walltime = "00:59:00" - elif args.partition == "daily": - args.walltime = "23:59:00" - elif args.partition == "general": - args.walltime = "7-00:00:00" + if slurm_walltime is None: + if slurm_partition == "hourly": + slurm_walltime = "00:59:00" + elif slurm_partition == "daily": + slurm_walltime = "23:59:00" + elif slurm_partition == "general": + slurm_walltime = "7-00:00:00" else: - args.walltime = "00:59:00" + slurm_walltime = "00:59:00" print("Running in SLURM mode") - print(f"Resources: {args.cores} cores, {args.memory} memory") - print(f"Partition: {args.partition}, Walltime: {args.walltime}") + print(f"Resources: {slurm_cores} cores, {slurm_memory} memory") + print(f"Partition: {slurm_partition}, Walltime: {slurm_walltime}") cluster = SLURMCluster( - cores=args.cores, - processes=args.cores, - memory=args.memory, - walltime=args.walltime, - job_extra_directives=[f"--partition={args.partition}"], + cores=slurm_cores, + processes=slurm_cores, + memory=slurm_memory, + walltime=slurm_walltime, + job_extra_directives=[f"--partition={slurm_partition}"], ) cluster.scale(1) client = Client(cluster) @@ -110,19 +117,19 @@ def main(): try: # --- Find files --- - files = find_files(args.source, args.filter) - print(f"Found {len(files)} files matching pattern '{args.filter}'") + files = find_files(source, file_filter) + print(f"Found {len(files)} files matching pattern '{file_filter}'") if len(files) == 0: print("No files found matching the filter pattern") return # --- Process in chunks --- i = 0 - total_chunks = (len(files) - 1) // args.chunk + 1 - for chunk in chunks(files, args.chunk): + total_chunks = (len(files) - 1) // chunk_size + 1 + for chunk in chunks(files, chunk_size): print(f"Processing chunk {i+1} / {total_chunks}") tasks = [ - delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk + delayed(process_sp2xr_file)(f, schema_config, target) for f in chunk ] dask.compute(*tasks) gc.collect()