Add local option for converting csv to parquet
This commit is contained in:
@@ -2,7 +2,9 @@ import argparse
|
|||||||
import time
|
import time
|
||||||
import gc
|
import gc
|
||||||
import dask
|
import dask
|
||||||
from dask.distributed import Client
|
import multiprocessing
|
||||||
|
import psutil
|
||||||
|
from dask.distributed import Client, LocalCluster
|
||||||
from dask_jobqueue import SLURMCluster
|
from dask_jobqueue import SLURMCluster
|
||||||
from dask import delayed
|
from dask import delayed
|
||||||
from sp2xr.io import process_sp2xr_file
|
from sp2xr.io import process_sp2xr_file
|
||||||
@@ -11,7 +13,7 @@ from sp2xr.helpers import find_files, chunks
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="Batch convert SP2XR zip to parquet using Dask + SLURM"
|
description="Batch convert SP2XR zip to parquet using Dask (local or SLURM cluster)"
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--source", required=True, help="Directory containing input zip/CSV files"
|
"--source", required=True, help="Directory containing input zip/CSV files"
|
||||||
@@ -32,63 +34,107 @@ def main():
|
|||||||
help="Number of files per batch (default: 100)",
|
help="Number of files per batch (default: 100)",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--cores", type=int, default=64, help="Cores per job (default: 64)"
|
"--local",
|
||||||
|
action="store_true",
|
||||||
|
help="Use local Dask client (auto-detects resources) instead of SLURM cluster",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--memory", default="128GB", help="Memory per job (default: 128GB)"
|
"--cores",
|
||||||
|
type=int,
|
||||||
|
default=64,
|
||||||
|
help="Cores per SLURM job (ignored for --local, default: 64)",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--walltime", default=None, help="Walltime (default depends on partition)"
|
"--memory",
|
||||||
|
default="128GB",
|
||||||
|
help="Memory per SLURM job (ignored for --local, default: 128GB)",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--partition", default="daily", help="SLURM partition (default: daily)"
|
"--walltime",
|
||||||
|
default=None,
|
||||||
|
help="Walltime for SLURM (ignored for --local, default depends on partition)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--partition",
|
||||||
|
default="daily",
|
||||||
|
help="SLURM partition (ignored for --local, default: daily)",
|
||||||
)
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
if args.walltime is None:
|
# --- Setup cluster (local or SLURM) ---
|
||||||
if args.partition == "hourly":
|
|
||||||
args.walltime = "00:59:00"
|
|
||||||
elif args.partition == "daily":
|
|
||||||
args.walltime = "23:59:00"
|
|
||||||
elif args.partition == "general":
|
|
||||||
args.walltime = "7-00:00:00"
|
|
||||||
else:
|
|
||||||
args.walltime = "00:59:00"
|
|
||||||
|
|
||||||
# --- Setup SLURM cluster ---
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
cluster = SLURMCluster(
|
|
||||||
cores=args.cores,
|
|
||||||
processes=args.cores,
|
|
||||||
memory=args.memory,
|
|
||||||
walltime=args.walltime,
|
|
||||||
job_extra_directives=[f"--partition={args.partition}"],
|
|
||||||
)
|
|
||||||
cluster.scale(1)
|
|
||||||
client = Client(cluster)
|
|
||||||
print(f"Dask dashboard: {client.dashboard_link}")
|
|
||||||
|
|
||||||
# --- Find files ---
|
if args.local:
|
||||||
files = find_files(args.source, args.filter)
|
# Local execution: auto-detect resources
|
||||||
print(f"Found {len(files)} files matching pattern '{args.filter}'")
|
total_cores = multiprocessing.cpu_count()
|
||||||
|
total_memory = psutil.virtual_memory().total # in bytes
|
||||||
|
memory_limit_bytes = int(total_memory * 0.8)
|
||||||
|
memory_limit = f"{memory_limit_bytes // (1024**3)}GB"
|
||||||
|
|
||||||
# --- Process in chunks ---
|
print("Running in LOCAL mode")
|
||||||
i = 0
|
print(f"Auto-detected resources: {total_cores} cores, {memory_limit} memory")
|
||||||
for chunk in chunks(files, args.chunk):
|
|
||||||
print(f"Processing chunk {i+1} / {len(files)//args.chunk + 1}")
|
|
||||||
|
|
||||||
tasks = [
|
cluster = LocalCluster(
|
||||||
delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk
|
n_workers=total_cores,
|
||||||
]
|
threads_per_worker=1,
|
||||||
dask.compute(*tasks)
|
memory_limit=memory_limit,
|
||||||
|
)
|
||||||
|
client = Client(cluster)
|
||||||
|
print(f"Dask dashboard: {client.dashboard_link}")
|
||||||
|
else:
|
||||||
|
# SLURM execution
|
||||||
|
if args.walltime is None:
|
||||||
|
if args.partition == "hourly":
|
||||||
|
args.walltime = "00:59:00"
|
||||||
|
elif args.partition == "daily":
|
||||||
|
args.walltime = "23:59:00"
|
||||||
|
elif args.partition == "general":
|
||||||
|
args.walltime = "7-00:00:00"
|
||||||
|
else:
|
||||||
|
args.walltime = "00:59:00"
|
||||||
|
|
||||||
gc.collect()
|
print("Running in SLURM mode")
|
||||||
i += 1
|
print(f"Resources: {args.cores} cores, {args.memory} memory")
|
||||||
|
print(f"Partition: {args.partition}, Walltime: {args.walltime}")
|
||||||
|
|
||||||
# --- Cleanup ---
|
cluster = SLURMCluster(
|
||||||
client.close()
|
cores=args.cores,
|
||||||
cluster.close()
|
processes=args.cores,
|
||||||
print(f"✅ Finished in {(time.time() - start_time)/60:.2f} minutes")
|
memory=args.memory,
|
||||||
|
walltime=args.walltime,
|
||||||
|
job_extra_directives=[f"--partition={args.partition}"],
|
||||||
|
)
|
||||||
|
cluster.scale(1)
|
||||||
|
client = Client(cluster)
|
||||||
|
print(f"Dask SLURM dashboard: {client.dashboard_link}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# --- Find files ---
|
||||||
|
files = find_files(args.source, args.filter)
|
||||||
|
print(f"Found {len(files)} files matching pattern '{args.filter}'")
|
||||||
|
if len(files) == 0:
|
||||||
|
print("No files found matching the filter pattern")
|
||||||
|
return
|
||||||
|
|
||||||
|
# --- Process in chunks ---
|
||||||
|
i = 0
|
||||||
|
total_chunks = (len(files) - 1) // args.chunk + 1
|
||||||
|
for chunk in chunks(files, args.chunk):
|
||||||
|
print(f"Processing chunk {i+1} / {total_chunks}")
|
||||||
|
tasks = [
|
||||||
|
delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk
|
||||||
|
]
|
||||||
|
dask.compute(*tasks)
|
||||||
|
gc.collect()
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
print(f"Finished in {(time.time() - start_time)/60:.2f} minutes")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# --- Cleanup ---
|
||||||
|
print("Shutting down Dask cluster...")
|
||||||
|
client.close()
|
||||||
|
cluster.close()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user