From a2cc520ff23d7bb9efbba65085b71317ef41aec5 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Thu, 14 Aug 2025 11:48:52 +0200 Subject: [PATCH] feat: possibility to choose between running locally and via slurm cluster --- src/sp2xr/helpers.py | 58 ++++++++++++++++++++++++++++++------------- tests/run_config.yaml | 3 ++- 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index e7a92d1..397edd0 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -5,7 +5,9 @@ import argparse import pandas as pd from pathlib import Path from dask_jobqueue import SLURMCluster -from dask.distributed import Client +import multiprocessing +import psutil +from dask.distributed import Client, LocalCluster from typing import Optional, List @@ -35,6 +37,7 @@ def load_and_resolve_config(args): "rho_eff": choose(args.BC_rho, base, "bc.rho_eff", 1.8), "BC_type": choose(args.BC_type, base, "bc.type", "constant_effective_density"), "cluster": { + "use_local": choose(args.local, base, "cluster.use_local"), "cores": choose(args.cores, base, "cluster.cores", 8), "memory": choose(args.memory, base, "cluster.memory", "64GB"), "walltime": choose(args.walltime, base, "cluster.walltime", "00:59:00"), @@ -83,31 +86,51 @@ def load_and_resolve_config(args): def initialize_cluster(config): - return make_cluster( - cores=config["cluster"]["cores"], - mem=config["cluster"]["memory"], - wall=config["cluster"]["walltime"], - partition=config["cluster"]["partition"], - out_dir=config["cluster"]["log_dir"], - ) + if config["cluster"].get("use_local", False): + return make_slurm_cluster(config) + else: + return make_local_cluster(config) -def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""): +def make_slurm_cluster(config): cluster = SLURMCluster( - cores=cores, - processes=cores, - memory=mem, - walltime=wall, + cores=config["cluster"]["cores"], + processes=config["cluster"]["cores"], + memory=config["cluster"]["memory"], + walltime=config["cluster"]["walltime"], job_extra_directives=[ - f"--partition={partition}", - f"--output={out_dir}/slurm-%j.out", + f"--partition={config['cluster']['partition']}", + f"--output={config['cluster']['log_dir']}/slurm-%j.out", ], ) - client = Client(cluster) cluster.scale(1) client.wait_for_workers(1, timeout=600) - print(f"Dask dashboard: {client.dashboard_link}") + print(f"Dask SLURM dashboard: {client.dashboard_link}") + return client + + +def make_local_cluster(config): + # Automatically detect available CPU cores and memory + total_cores = multiprocessing.cpu_count() + total_memory = psutil.virtual_memory().total # in bytes + + # Use all cores or config override + cores = config["cluster"].get("cores", total_cores) + memory_limit = config["cluster"].get("memory") + + # If memory not provided, use 80% of total + if memory_limit is None: + memory_limit_bytes = int(total_memory * 0.8) + memory_limit = f"{memory_limit_bytes // (1024**3)}GB" + + cluster = LocalCluster( + n_workers=cores, + threads_per_worker=1, + memory_limit=memory_limit, + ) + client = Client(cluster) + print(f"Dask LOCAL dashboard: {client.dashboard_link}") return client @@ -165,6 +188,7 @@ def parse_args(): ) # cluster / resource knobs (all optional) + p.add_argument("--local", action="store_false", help="Run Local Cluster") p.add_argument( "--cores", type=int, default=8, help="CPU cores per SLURM job (default: 8)" ) diff --git a/tests/run_config.yaml b/tests/run_config.yaml index 33b7e3f..065089c 100644 --- a/tests/run_config.yaml +++ b/tests/run_config.yaml @@ -2,7 +2,7 @@ paths: input_pbp: tests/data/pbp_files_test input_hk: tests/data/hk_files_test output: tests/data3 - instrument_config: tests/data/instrument_config.yaml + instrument_config: tests/instrument_config.yaml workflow: conc: true @@ -12,6 +12,7 @@ workflow: dt: 1 # seconds cluster: + use_local: false cores: 8 memory: 64GB walltime: "00:59:00"