From a1595b1e866fd477d15c236f6a989dd7be7297d8 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Mon, 4 Aug 2025 09:40:00 +0200 Subject: [PATCH] feat: add CLI script to convert SP2XR zip/CSV files to parquet using Dask --- scripts/sp2xr_csv2parquet.py | 83 ++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 scripts/sp2xr_csv2parquet.py diff --git a/scripts/sp2xr_csv2parquet.py b/scripts/sp2xr_csv2parquet.py new file mode 100644 index 0000000..a0b8a06 --- /dev/null +++ b/scripts/sp2xr_csv2parquet.py @@ -0,0 +1,83 @@ +import argparse +import time +import gc +import dask +from dask.distributed import Client +from dask_jobqueue import SLURMCluster +from dask import delayed +from sp2xr.io import process_sp2xr_file +from sp2xr.helpers import find_files, chunks + + +def main(): + parser = argparse.ArgumentParser( + description="Batch convert SP2XR zip to parquet using Dask + SLURM" + ) + parser.add_argument( + "--source", required=True, help="Directory containing input zip/CSV files" + ) + parser.add_argument( + "--target", required=True, help="Output directory for parquet files" + ) + parser.add_argument( + "--config", required=True, help="Path to YAML schema config file" + ) + parser.add_argument( + "--filter", default="PbP", help="Pattern to filter files (default: PbP)" + ) + parser.add_argument( + "--chunk", + type=int, + default=100, + help="Number of files per batch (default: 100)", + ) + parser.add_argument( + "--cores", type=int, default=64, help="Cores per job (default: 64)" + ) + parser.add_argument( + "--memory", default="128GB", help="Memory per job (default: 128GB)" + ) + parser.add_argument( + "--walltime", default="05:59:00", help="Walltime (default: 5h59m)" + ) + parser.add_argument("--partition", default="daily", help="SLURM partition to use") + args = parser.parse_args() + + # --- Setup SLURM cluster --- + start_time = time.time() + cluster = SLURMCluster( + cores=args.cores, + processes=args.cores, + memory=args.memory, + walltime=args.walltime, + job_extra_directives=[f"--partition={args.partition}"], + ) + cluster.scale(1) + client = Client(cluster) + print(f"Dask dashboard: {client.dashboard_link}") + + # --- Find files --- + files = find_files(args.source, args.filter) + print(f"Found {len(files)} files matching pattern '{args.filter}'") + + # --- Process in chunks --- + i = 0 + for chunk in chunks(files, args.chunk): + print(f"Processing chunk {i+1} / {len(files)//args.chunk + 1}") + + tasks = [ + delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk + ] + dask.compute(*tasks) + + gc.collect() + i += 1 + + # --- Cleanup --- + client.close() + cluster.close() + print(f"✅ Finished in {(time.time() - start_time)/60:.2f} minutes") + + +if __name__ == "__main__": + main()