From 5a52ca2fdd12c1cc4fce1b467a2f1c1bd48aee4e Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Mon, 17 Nov 2025 16:07:36 +0100 Subject: [PATCH] Add local option for converting csv to parquet --- scripts/sp2xr_csv2parquet.py | 134 +++++++++++++++++++++++------------ 1 file changed, 90 insertions(+), 44 deletions(-) diff --git a/scripts/sp2xr_csv2parquet.py b/scripts/sp2xr_csv2parquet.py index cf6ec7d..df593eb 100644 --- a/scripts/sp2xr_csv2parquet.py +++ b/scripts/sp2xr_csv2parquet.py @@ -2,7 +2,9 @@ import argparse import time import gc import dask -from dask.distributed import Client +import multiprocessing +import psutil +from dask.distributed import Client, LocalCluster from dask_jobqueue import SLURMCluster from dask import delayed from sp2xr.io import process_sp2xr_file @@ -11,7 +13,7 @@ from sp2xr.helpers import find_files, chunks def main(): parser = argparse.ArgumentParser( - description="Batch convert SP2XR zip to parquet using Dask + SLURM" + description="Batch convert SP2XR zip to parquet using Dask (local or SLURM cluster)" ) parser.add_argument( "--source", required=True, help="Directory containing input zip/CSV files" @@ -32,63 +34,107 @@ def main(): help="Number of files per batch (default: 100)", ) parser.add_argument( - "--cores", type=int, default=64, help="Cores per job (default: 64)" + "--local", + action="store_true", + help="Use local Dask client (auto-detects resources) instead of SLURM cluster", ) parser.add_argument( - "--memory", default="128GB", help="Memory per job (default: 128GB)" + "--cores", + type=int, + default=64, + help="Cores per SLURM job (ignored for --local, default: 64)", ) parser.add_argument( - "--walltime", default=None, help="Walltime (default depends on partition)" + "--memory", + default="128GB", + help="Memory per SLURM job (ignored for --local, default: 128GB)", ) parser.add_argument( - "--partition", default="daily", help="SLURM partition (default: daily)" + "--walltime", + default=None, + help="Walltime for SLURM (ignored for --local, default depends on partition)", + ) + parser.add_argument( + "--partition", + default="daily", + help="SLURM partition (ignored for --local, default: daily)", ) args = parser.parse_args() - if args.walltime is None: - if args.partition == "hourly": - args.walltime = "00:59:00" - elif args.partition == "daily": - args.walltime = "23:59:00" - elif args.partition == "general": - args.walltime = "7-00:00:00" - else: - args.walltime = "00:59:00" - - # --- Setup SLURM cluster --- + # --- Setup cluster (local or SLURM) --- start_time = time.time() - cluster = SLURMCluster( - cores=args.cores, - processes=args.cores, - memory=args.memory, - walltime=args.walltime, - job_extra_directives=[f"--partition={args.partition}"], - ) - cluster.scale(1) - client = Client(cluster) - print(f"Dask dashboard: {client.dashboard_link}") - # --- Find files --- - files = find_files(args.source, args.filter) - print(f"Found {len(files)} files matching pattern '{args.filter}'") + if args.local: + # Local execution: auto-detect resources + total_cores = multiprocessing.cpu_count() + total_memory = psutil.virtual_memory().total # in bytes + memory_limit_bytes = int(total_memory * 0.8) + memory_limit = f"{memory_limit_bytes // (1024**3)}GB" - # --- Process in chunks --- - i = 0 - for chunk in chunks(files, args.chunk): - print(f"Processing chunk {i+1} / {len(files)//args.chunk + 1}") + print("Running in LOCAL mode") + print(f"Auto-detected resources: {total_cores} cores, {memory_limit} memory") - tasks = [ - delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk - ] - dask.compute(*tasks) + cluster = LocalCluster( + n_workers=total_cores, + threads_per_worker=1, + memory_limit=memory_limit, + ) + client = Client(cluster) + print(f"Dask dashboard: {client.dashboard_link}") + else: + # SLURM execution + if args.walltime is None: + if args.partition == "hourly": + args.walltime = "00:59:00" + elif args.partition == "daily": + args.walltime = "23:59:00" + elif args.partition == "general": + args.walltime = "7-00:00:00" + else: + args.walltime = "00:59:00" - gc.collect() - i += 1 + print("Running in SLURM mode") + print(f"Resources: {args.cores} cores, {args.memory} memory") + print(f"Partition: {args.partition}, Walltime: {args.walltime}") - # --- Cleanup --- - client.close() - cluster.close() - print(f"✅ Finished in {(time.time() - start_time)/60:.2f} minutes") + cluster = SLURMCluster( + cores=args.cores, + processes=args.cores, + memory=args.memory, + walltime=args.walltime, + job_extra_directives=[f"--partition={args.partition}"], + ) + cluster.scale(1) + client = Client(cluster) + print(f"Dask SLURM dashboard: {client.dashboard_link}") + + try: + # --- Find files --- + files = find_files(args.source, args.filter) + print(f"Found {len(files)} files matching pattern '{args.filter}'") + if len(files) == 0: + print("No files found matching the filter pattern") + return + + # --- Process in chunks --- + i = 0 + total_chunks = (len(files) - 1) // args.chunk + 1 + for chunk in chunks(files, args.chunk): + print(f"Processing chunk {i+1} / {total_chunks}") + tasks = [ + delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk + ] + dask.compute(*tasks) + gc.collect() + i += 1 + + print(f"Finished in {(time.time() - start_time)/60:.2f} minutes") + + finally: + # --- Cleanup --- + print("Shutting down Dask cluster...") + client.close() + cluster.close() if __name__ == "__main__":