149 lines
4.6 KiB
Python
149 lines
4.6 KiB
Python
import argparse
|
|
import time
|
|
import gc
|
|
import dask
|
|
import multiprocessing
|
|
import psutil
|
|
import yaml
|
|
from pathlib import Path
|
|
from dask.distributed import Client, LocalCluster
|
|
from dask_jobqueue import SLURMCluster
|
|
from dask import delayed
|
|
from sp2xr.io import process_sp2xr_file
|
|
from sp2xr.helpers import find_files, chunks
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Batch convert SP2XR zip to parquet using Dask (local or SLURM cluster)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Example:
|
|
python sp2xr_csv2parquet.py --config my_conversion_config.yaml
|
|
|
|
See config/conversion_config_template.yaml for configuration options.
|
|
""",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--config",
|
|
required=True,
|
|
help="Path to conversion config YAML file",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load config from file
|
|
config_path = Path(args.config)
|
|
if not config_path.exists():
|
|
print(f"Error: Config file not found: {config_path}")
|
|
return 1
|
|
|
|
with open(config_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
print(f"Loaded conversion config from: {config_path}")
|
|
|
|
# Extract parameters from config
|
|
source = config.get("source_directory")
|
|
target = config.get("target_directory")
|
|
schema_config = config.get("schema_config")
|
|
file_filter = config.get("file_filter", "PbP")
|
|
chunk_size = config.get("chunk_size", 100)
|
|
execution_mode = config.get("execution_mode", "local")
|
|
|
|
# SLURM parameters
|
|
slurm_cores = config.get("slurm_cores", 64)
|
|
slurm_memory = config.get("slurm_memory", "128GB")
|
|
slurm_partition = config.get("slurm_partition", "daily")
|
|
slurm_walltime = config.get("slurm_walltime")
|
|
|
|
# Validate required parameters
|
|
if not source or not target or not schema_config:
|
|
print("Error: Missing required parameters in config file:")
|
|
print(" - source_directory")
|
|
print(" - target_directory")
|
|
print(" - schema_config")
|
|
return 1
|
|
|
|
use_local = execution_mode == "local"
|
|
|
|
# --- Setup cluster (local or SLURM) ---
|
|
start_time = time.time()
|
|
|
|
if use_local:
|
|
# Local execution: auto-detect resources
|
|
total_cores = multiprocessing.cpu_count()
|
|
total_memory = psutil.virtual_memory().total # in bytes
|
|
memory_limit_bytes = int(total_memory * 0.8)
|
|
memory_limit = f"{memory_limit_bytes // (1024**3)}GB"
|
|
|
|
print("Running in LOCAL mode")
|
|
print(f"Auto-detected resources: {total_cores} cores, {memory_limit} memory")
|
|
|
|
cluster = LocalCluster(
|
|
n_workers=total_cores,
|
|
threads_per_worker=1,
|
|
memory_limit=memory_limit,
|
|
)
|
|
client = Client(cluster)
|
|
print(f"Dask dashboard: {client.dashboard_link}")
|
|
else:
|
|
# SLURM execution
|
|
if slurm_walltime is None:
|
|
if slurm_partition == "hourly":
|
|
slurm_walltime = "00:59:00"
|
|
elif slurm_partition == "daily":
|
|
slurm_walltime = "23:59:00"
|
|
elif slurm_partition == "general":
|
|
slurm_walltime = "7-00:00:00"
|
|
else:
|
|
slurm_walltime = "00:59:00"
|
|
|
|
print("Running in SLURM mode")
|
|
print(f"Resources: {slurm_cores} cores, {slurm_memory} memory")
|
|
print(f"Partition: {slurm_partition}, Walltime: {slurm_walltime}")
|
|
|
|
cluster = SLURMCluster(
|
|
cores=slurm_cores,
|
|
processes=slurm_cores,
|
|
memory=slurm_memory,
|
|
walltime=slurm_walltime,
|
|
job_extra_directives=[f"--partition={slurm_partition}"],
|
|
)
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(f"Dask SLURM dashboard: {client.dashboard_link}")
|
|
|
|
try:
|
|
# --- Find files ---
|
|
files = find_files(source, file_filter)
|
|
print(f"Found {len(files)} files matching pattern '{file_filter}'")
|
|
if len(files) == 0:
|
|
print("No files found matching the filter pattern")
|
|
return
|
|
|
|
# --- Process in chunks ---
|
|
i = 0
|
|
total_chunks = (len(files) - 1) // chunk_size + 1
|
|
for chunk in chunks(files, chunk_size):
|
|
print(f"Processing chunk {i+1} / {total_chunks}")
|
|
tasks = [
|
|
delayed(process_sp2xr_file)(f, schema_config, target) for f in chunk
|
|
]
|
|
dask.compute(*tasks)
|
|
gc.collect()
|
|
i += 1
|
|
|
|
print(f"Finished in {(time.time() - start_time)/60:.2f} minutes")
|
|
|
|
finally:
|
|
# --- Cleanup ---
|
|
print("Shutting down Dask cluster...")
|
|
client.close()
|
|
cluster.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|