Files
SP2XR/scripts/sp2xr_csv2parquet.py

96 lines
2.7 KiB
Python

import argparse
import time
import gc
import dask
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from dask import delayed
from sp2xr.io import process_sp2xr_file
from sp2xr.helpers import find_files, chunks
def main():
parser = argparse.ArgumentParser(
description="Batch convert SP2XR zip to parquet using Dask + SLURM"
)
parser.add_argument(
"--source", required=True, help="Directory containing input zip/CSV files"
)
parser.add_argument(
"--target", required=True, help="Output directory for parquet files"
)
parser.add_argument(
"--config", required=True, help="Path to YAML schema config file"
)
parser.add_argument(
"--filter", default="PbP", help="Pattern to filter files (default: PbP)"
)
parser.add_argument(
"--chunk",
type=int,
default=100,
help="Number of files per batch (default: 100)",
)
parser.add_argument(
"--cores", type=int, default=64, help="Cores per job (default: 64)"
)
parser.add_argument(
"--memory", default="128GB", help="Memory per job (default: 128GB)"
)
parser.add_argument(
"--walltime", default=None, help="Walltime (default depends on partition)"
)
parser.add_argument(
"--partition", default="daily", help="SLURM partition (default: daily)"
)
args = parser.parse_args()
if args.walltime is None:
if args.partition == "hourly":
args.walltime = "00:59:00"
elif args.partition == "daily":
args.walltime = "23:59:00"
elif args.partition == "general":
args.walltime = "7-00:00:00"
else:
args.walltime = "00:59:00"
# --- Setup SLURM cluster ---
start_time = time.time()
cluster = SLURMCluster(
cores=args.cores,
processes=args.cores,
memory=args.memory,
walltime=args.walltime,
job_extra_directives=[f"--partition={args.partition}"],
)
cluster.scale(1)
client = Client(cluster)
print(f"Dask dashboard: {client.dashboard_link}")
# --- Find files ---
files = find_files(args.source, args.filter)
print(f"Found {len(files)} files matching pattern '{args.filter}'")
# --- Process in chunks ---
i = 0
for chunk in chunks(files, args.chunk):
print(f"Processing chunk {i+1} / {len(files)//args.chunk + 1}")
tasks = [
delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk
]
dask.compute(*tasks)
gc.collect()
i += 1
# --- Cleanup ---
client.close()
cluster.close()
print(f"✅ Finished in {(time.time() - start_time)/60:.2f} minutes")
if __name__ == "__main__":
main()