feat: add CLI script to convert SP2XR zip/CSV files to parquet using Dask
This commit is contained in:
83
scripts/sp2xr_csv2parquet.py
Normal file
83
scripts/sp2xr_csv2parquet.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import argparse
|
||||
import time
|
||||
import gc
|
||||
import dask
|
||||
from dask.distributed import Client
|
||||
from dask_jobqueue import SLURMCluster
|
||||
from dask import delayed
|
||||
from sp2xr.io import process_sp2xr_file
|
||||
from sp2xr.helpers import find_files, chunks
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Batch convert SP2XR zip to parquet using Dask + SLURM"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--source", required=True, help="Directory containing input zip/CSV files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--target", required=True, help="Output directory for parquet files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config", required=True, help="Path to YAML schema config file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--filter", default="PbP", help="Pattern to filter files (default: PbP)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--chunk",
|
||||
type=int,
|
||||
default=100,
|
||||
help="Number of files per batch (default: 100)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cores", type=int, default=64, help="Cores per job (default: 64)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--memory", default="128GB", help="Memory per job (default: 128GB)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--walltime", default="05:59:00", help="Walltime (default: 5h59m)"
|
||||
)
|
||||
parser.add_argument("--partition", default="daily", help="SLURM partition to use")
|
||||
args = parser.parse_args()
|
||||
|
||||
# --- Setup SLURM cluster ---
|
||||
start_time = time.time()
|
||||
cluster = SLURMCluster(
|
||||
cores=args.cores,
|
||||
processes=args.cores,
|
||||
memory=args.memory,
|
||||
walltime=args.walltime,
|
||||
job_extra_directives=[f"--partition={args.partition}"],
|
||||
)
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
print(f"Dask dashboard: {client.dashboard_link}")
|
||||
|
||||
# --- Find files ---
|
||||
files = find_files(args.source, args.filter)
|
||||
print(f"Found {len(files)} files matching pattern '{args.filter}'")
|
||||
|
||||
# --- Process in chunks ---
|
||||
i = 0
|
||||
for chunk in chunks(files, args.chunk):
|
||||
print(f"Processing chunk {i+1} / {len(files)//args.chunk + 1}")
|
||||
|
||||
tasks = [
|
||||
delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk
|
||||
]
|
||||
dask.compute(*tasks)
|
||||
|
||||
gc.collect()
|
||||
i += 1
|
||||
|
||||
# --- Cleanup ---
|
||||
client.close()
|
||||
cluster.close()
|
||||
print(f"✅ Finished in {(time.time() - start_time)/60:.2f} minutes")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user