feat: possibility to choose between running locally and via slurm cluster

This commit is contained in:
2025-08-14 11:48:52 +02:00
parent 18b8635147
commit a2cc520ff2
2 changed files with 43 additions and 18 deletions

View File

@@ -5,7 +5,9 @@ import argparse
import pandas as pd
from pathlib import Path
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import multiprocessing
import psutil
from dask.distributed import Client, LocalCluster
from typing import Optional, List
@@ -35,6 +37,7 @@ def load_and_resolve_config(args):
"rho_eff": choose(args.BC_rho, base, "bc.rho_eff", 1.8),
"BC_type": choose(args.BC_type, base, "bc.type", "constant_effective_density"),
"cluster": {
"use_local": choose(args.local, base, "cluster.use_local"),
"cores": choose(args.cores, base, "cluster.cores", 8),
"memory": choose(args.memory, base, "cluster.memory", "64GB"),
"walltime": choose(args.walltime, base, "cluster.walltime", "00:59:00"),
@@ -83,31 +86,51 @@ def load_and_resolve_config(args):
def initialize_cluster(config):
return make_cluster(
cores=config["cluster"]["cores"],
mem=config["cluster"]["memory"],
wall=config["cluster"]["walltime"],
partition=config["cluster"]["partition"],
out_dir=config["cluster"]["log_dir"],
)
if config["cluster"].get("use_local", False):
return make_slurm_cluster(config)
else:
return make_local_cluster(config)
def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""):
def make_slurm_cluster(config):
cluster = SLURMCluster(
cores=cores,
processes=cores,
memory=mem,
walltime=wall,
cores=config["cluster"]["cores"],
processes=config["cluster"]["cores"],
memory=config["cluster"]["memory"],
walltime=config["cluster"]["walltime"],
job_extra_directives=[
f"--partition={partition}",
f"--output={out_dir}/slurm-%j.out",
f"--partition={config['cluster']['partition']}",
f"--output={config['cluster']['log_dir']}/slurm-%j.out",
],
)
client = Client(cluster)
cluster.scale(1)
client.wait_for_workers(1, timeout=600)
print(f"Dask dashboard: {client.dashboard_link}")
print(f"Dask SLURM dashboard: {client.dashboard_link}")
return client
def make_local_cluster(config):
# Automatically detect available CPU cores and memory
total_cores = multiprocessing.cpu_count()
total_memory = psutil.virtual_memory().total # in bytes
# Use all cores or config override
cores = config["cluster"].get("cores", total_cores)
memory_limit = config["cluster"].get("memory")
# If memory not provided, use 80% of total
if memory_limit is None:
memory_limit_bytes = int(total_memory * 0.8)
memory_limit = f"{memory_limit_bytes // (1024**3)}GB"
cluster = LocalCluster(
n_workers=cores,
threads_per_worker=1,
memory_limit=memory_limit,
)
client = Client(cluster)
print(f"Dask LOCAL dashboard: {client.dashboard_link}")
return client
@@ -165,6 +188,7 @@ def parse_args():
)
# cluster / resource knobs (all optional)
p.add_argument("--local", action="store_false", help="Run Local Cluster")
p.add_argument(
"--cores", type=int, default=8, help="CPU cores per SLURM job (default: 8)"
)

View File

@@ -2,7 +2,7 @@ paths:
input_pbp: tests/data/pbp_files_test
input_hk: tests/data/hk_files_test
output: tests/data3
instrument_config: tests/data/instrument_config.yaml
instrument_config: tests/instrument_config.yaml
workflow:
conc: true
@@ -12,6 +12,7 @@ workflow:
dt: 1 # seconds
cluster:
use_local: false
cores: 8
memory: 64GB
walltime: "00:59:00"