Files
2026-06-02 16:27:50 +02:00

725 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""
slurm-eff-tool.py - Slurm job efficiency reporting and investigation tool.
Examples:
./slurm_eff.py --start 2026-05-01 --end 2026-05-22 -u dfeich
./slurm_eff.py -S 2026-05-01 -E now -u dfeich -O sacct.cache
./slurm_eff.py -F sacct.cache --aggr-user --sdev -s cpu,-mem,time
./slurm_eff.py -F sacct.cache -R '^vasp','^gromacs' --json
./slurm_eff.py -F sacct.cache -o "%.12u %c %N %m %.12l %C %.8e %.8M %.8t %.30j"
Efficiency definitions:
CPU_Efficiency = TotalCPU_seconds / CPUTimeRAW * 100
Memory_Efficiency = max(MaxRSS_bytes across non-extern job steps) / requested_memory_bytes * 100
Time_Efficiency = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100
Memory efficiency intentionally uses the maximum MaxRSS seen across non-extern
Slurm job steps. This conceptually matches the practical "peak RSS" style of
seff, but it is not a perfect sum of all ranks.
This script intentionally asks sacct for raw parsable fields and caches those rows.
"""
# initial version of script produced by vibe coding of an exact functionality
# description using chatgpt-5.5
# 2026 D. Feichtinger <derek.feichtinger@psi.ch>
from __future__ import annotations
import argparse
import csv
import json
import math
import os
import re
import statistics
import subprocess
import sys
import signal
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable
SACCT_FIELDS = [
"JobIDRaw",
"JobID",
"User",
"JobName",
"State",
"AllocCPUS",
"NNodes",
"ReqMem",
"ElapsedRaw",
"TimelimitRaw",
"CPUTimeRAW",
"TotalCPU",
"MaxRSS",
]
DEFAULT_COLUMNS = [
"username",
"JobID",
"CPUs",
"Nodes",
"ReqMem",
"MemPerCPU",
"ReqWalltime",
"Count",
"CPU_Efficiency",
"Memory_Efficiency",
"Time_Efficiency",
"jobname",
]
# One-character aliases for sorting and output format specifications.
ALIASES = {
"u": "username",
"i": "JobID",
"c": "CPUs",
"N": "Nodes",
"m": "ReqMem",
"p": "MemPerCPU",
"l": "ReqWalltime",
"C": "Count",
"e": "CPU_Efficiency",
"M": "Memory_Efficiency",
"t": "Time_Efficiency",
"j": "jobname",
}
NUMERIC_COLUMNS = {
"CPUs",
"Nodes",
"ReqMem",
"MemPerCPU",
"ReqWalltime",
"Count",
"CPU_Efficiency",
"Memory_Efficiency",
"Time_Efficiency",
}
@dataclass
class JobRecord:
raw: dict[str, str]
username: str
jobname: str
cpus: int
nodes: int
reqmem_gb: float | None
mem_per_cpu_gb: float | None
reqwall_hours: float | None
reqmem_bytes_total: float | None
elapsed_sec: int
timelimit_sec: int
totalcpu_sec: float
maxrss_bytes: float | None
cpu_eff: float | None
mem_eff: float | None
time_eff: float | None
@dataclass
class OutputRow:
username: str
JobID: str
CPUs: int
Nodes: int
ReqMem: float | None
MemPerCPU: float | None
ReqWalltime: float | None
Count: int
CPU_Efficiency: float | None
Memory_Efficiency: float | None
Time_Efficiency: float | None
jobname: str
_cpu_eff_values: list[float] = field(default_factory=list, repr=False)
_mem_eff_values: list[float] = field(default_factory=list, repr=False)
_time_eff_values: list[float] = field(default_factory=list, repr=False)
def as_dict(self, sdev: bool = False) -> dict[str, Any]:
d: dict[str, Any] = {
"username": self.username,
"JobID": self.JobID,
"CPUs": self.CPUs,
"Nodes": self.Nodes,
"ReqMem": self.ReqMem,
"MemPerCPU": self.MemPerCPU,
"ReqWalltime": self.ReqWalltime,
"Count": self.Count,
"CPU_Efficiency": self.CPU_Efficiency,
"Memory_Efficiency": self.Memory_Efficiency,
"Time_Efficiency": self.Time_Efficiency,
"jobname": self.jobname,
}
if sdev:
for col, vals in [
("CPU_Efficiency", self._cpu_eff_values),
("Memory_Efficiency", self._mem_eff_values),
("Time_Efficiency", self._time_eff_values),
]:
d[f"{col}_sdev"] = stdev_or_none(vals)
d[f"{col}_max"] = max(vals) if vals else None
d[f"{col}_min"] = min(vals) if vals else None
return d
def die(msg: str, code: int = 2) -> None:
print(f"error: {msg}", file=sys.stderr)
raise SystemExit(code)
def slurm_duration_to_seconds(value: str) -> float:
"""Parse Slurm-ish CPU time strings such as 01:02:03, 2-01:02:03, 5:12.345."""
value = (value or "").strip()
if not value or value in {"Unknown", "UNLIMITED", "Partition_Limit"}:
return 0.0
days = 0
if "-" in value:
d, value = value.split("-", 1)
days = int(d)
parts = value.split(":")
try:
if len(parts) == 3:
h, m, s = parts
sec = float(s)
return days * 86400 + int(h) * 3600 + int(m) * 60 + sec
if len(parts) == 2:
m, s = parts
return days * 86400 + int(m) * 60 + float(s)
if len(parts) == 1:
return days * 86400 + float(parts[0])
except ValueError:
return 0.0
return 0.0
def format_seconds(seconds: int | float | None) -> str:
if seconds is None or seconds <= 0:
return "Unknown"
seconds = int(round(seconds))
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, sec = divmod(rem, 60)
if days:
return f"{days}-{hours:02d}:{minutes:02d}:{sec:02d}"
return f"{hours:02d}:{minutes:02d}:{sec:02d}"
def parse_size_to_bytes(value: str) -> float | None:
"""Parse Slurm memory size fields such as 1024K, 2000M, 8Gn, 4Gc, 1.5T."""
s = (value or "").strip()
if not s or s in {"Unknown", "0", "0K", "0M", "0G", "0T"}:
return None
# Slurm ReqMem may end in c/n for per-CPU or per-node. Strip that here.
if s[-1].lower() in {"c", "n"}:
s = s[:-1]
m = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)([KMGTP]?)", s, flags=re.I)
if not m:
return None
num = float(m.group(1))
unit = m.group(2).upper() or "K" # Slurm memory fields are normally KiB when unitless.
mult = {
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}[unit]
return num * mult
def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None:
"""Convert ReqMem into total requested bytes for the whole job allocation."""
raw = (reqmem or "").strip()
base = parse_size_to_bytes(raw)
if base is None:
return None
if raw.lower().endswith("c"):
return base * max(cpus, 1)
# Default Slurm ReqMem suffix is usually n: memory per node.
return base * max(nodes, 1)
def common_prefix(names: list[str]) -> str:
if not names:
return ""
prefix = os.path.commonprefix(names)
# Avoid ugly partial-token prefixes when possible.
prefix = re.sub(r"[^A-Za-z0-9_.-]+$", "", prefix)
if len(names) == 1:
return names[0]
namepattern = "*"
if prefix:
namepattern = f"{prefix}*"
return namepattern
def stdev_or_none(values: list[float]) -> float | None:
if len(values) < 2:
return 0.0 if len(values) == 1 else None
return statistics.stdev(values)
def mean_or_none(values: list[float]) -> float | None:
return statistics.mean(values) if values else None
def pct(numerator: float | None, denominator: float | None) -> float | None:
if numerator is None or denominator is None or denominator <= 0:
return None
return 100.0 * numerator / denominator
def run_sacct(args: argparse.Namespace) -> list[dict[str, str]]:
cmd = [
"sacct",
"-P",
"-n",
"--units=K",
"--format=" + ",".join(SACCT_FIELDS),
]
if args.start:
cmd += ["-S", args.start]
if args.end:
cmd += ["-E", args.end]
if args.user:
cmd += ["-u", args.user]
if args.state:
cmd += ["--state", args.state]
# Include job steps because MaxRSS often lives on batch/extern/step rows.
# We later collapse rows back to the base job ID.
try:
proc = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
die("sacct not found in PATH")
except subprocess.CalledProcessError as exc:
die(f"sacct failed with exit code {exc.returncode}:\n{exc.stderr.strip()}")
rows = parse_pipe_rows(proc.stdout.splitlines())
return rows
def parse_pipe_rows(lines: Iterable[str]) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
reader = csv.reader(lines, delimiter="|")
for parts in reader:
if not parts:
continue
parts = [p.strip() for p in parts]
if len(parts) < len(SACCT_FIELDS):
parts += [""] * (len(SACCT_FIELDS) - len(parts))
rows.append(dict(zip(SACCT_FIELDS, parts[: len(SACCT_FIELDS)])))
return rows
def write_cache(path: str, rows: list[dict[str, str]]) -> None:
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=SACCT_FIELDS, delimiter="|", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
def read_cache(path: str) -> list[dict[str, str]]:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="|")
missing = [x for x in SACCT_FIELDS if x not in (reader.fieldnames or [])]
# Older cache files from versions before CPUTimeRAW can still be read.
# CPU efficiency will be NA for those rows unless CPUTimeRAW is present.
missing_required = [x for x in missing if x != "CPUTimeRAW"]
if missing_required:
die(f"cache file is missing expected fields: {', '.join(missing_required)}")
return [{k: (row.get(k) or "").strip() for k in SACCT_FIELDS} for row in reader]
def base_job_id(jobid: str) -> str:
return re.split(r"[._]", jobid, maxsplit=1)[0]
def is_top_level_row(row: dict[str, str]) -> bool:
jid = row.get("JobID", "")
return "." not in jid and "_" not in jid
def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) -> list[JobRecord]:
"""Collapse sacct top-level and step rows into one JobRecord per base job."""
grouped: dict[str, list[dict[str, str]]] = defaultdict(list)
for row in rows:
# Do not filter by User here. On many Slurm installations, job step rows
# are where MaxRSS is populated, but their User field may be empty or may
# not match the parent job's user. Filtering before grouping drops those
# rows and makes Memory_Efficiency become NA. Filter after identifying
# the top-level job row instead.
grouped[base_job_id(row.get("JobIDRaw") or row.get("JobID", ""))].append(row)
records: list[JobRecord] = []
for _, group in grouped.items():
top = next((r for r in group if is_top_level_row(r)), group[0])
if only_user and top.get("User") != only_user:
continue
# seff-style practical peak RSS: maximum MaxRSS across non-extern job steps.
# This is intentionally not a sum over all ranks/tasks.
step_rows = [
r for r in group
if "." in (r.get("JobID") or r.get("JobIDRaw") or "")
and ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
]
rss_source_rows = step_rows or [
r for r in group
if ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
]
maxrss_values = [parse_size_to_bytes(r.get("MaxRSS", "")) for r in rss_source_rows]
maxrss = max([x for x in maxrss_values if x is not None], default=None)
cpus = int(float(top.get("AllocCPUS") or 0))
nodes = int(float(top.get("NNodes") or 0))
elapsed = int(float(top.get("ElapsedRaw") or 0))
# sacct TimelimitRaw is minutes, while ElapsedRaw and CPUTimeRAW are seconds.
timelimit_raw_minutes = float(top.get("TimelimitRaw") or 0)
timelimit_seconds = int(round(timelimit_raw_minutes * 60))
cputime_raw = float(top.get("CPUTimeRAW") or 0)
totalcpu = slurm_duration_to_seconds(top.get("TotalCPU", ""))
reqmem = top.get("ReqMem", "")
reqmem_total = reqmem_total_bytes(reqmem, cpus, nodes)
reqmem_gb = None
if reqmem_total is not None:
reqmem_gb = reqmem_total / (1024**3)
reqwall_hours = timelimit_seconds / 3600.0 if timelimit_seconds > 0 else None
mem_per_cpu_gb = reqmem_gb / cpus if reqmem_gb is not None and cpus > 0 else None
cpu_eff = pct(totalcpu, cputime_raw)
mem_eff = pct(maxrss, reqmem_total)
time_eff = pct(elapsed, timelimit_seconds)
records.append(
JobRecord(
raw=top,
username=top.get("User", ""),
jobname=top.get("JobName", ""),
cpus=cpus,
nodes=nodes,
reqmem_gb=reqmem_gb,
mem_per_cpu_gb=mem_per_cpu_gb,
reqwall_hours=reqwall_hours,
reqmem_bytes_total=reqmem_total,
elapsed_sec=elapsed,
timelimit_sec=timelimit_seconds,
totalcpu_sec=totalcpu,
maxrss_bytes=maxrss,
cpu_eff=cpu_eff,
mem_eff=mem_eff,
time_eff=time_eff,
)
)
return records
def aggregate_records(records: list[JobRecord], args: argparse.Namespace) -> list[OutputRow]:
"""Aggregate records according to given grouping instructions."""
if args.aggr_regexp:
compiled = [(pat, re.compile(pat)) for pat in args.aggr_regexp]
buckets: dict[tuple[Any, ...], list[JobRecord]] = defaultdict(list)
unmatched: list[JobRecord] = []
for rec in records:
matched = False
for pat, rx in compiled:
if rx.search(rec.jobname):
key = (pat, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours,
rec.username)
buckets[key].append(rec)
matched = True
break
if not matched:
unmatched.append(rec)
out = [make_aggregate_row(v, username=k[5], jobname=k[0]) for k, v in buckets.items()]
out.extend(make_single_row(r) for r in unmatched)
return out
if args.aggr_user:
buckets = defaultdict(list)
for rec in records:
key = (rec.username, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours)
buckets[key].append(rec)
return [make_aggregate_row(v, username=k[0], jobname=f"{common_prefix([r.jobname for r in v])}") for k, v in buckets.items()]
return [make_single_row(r) for r in records]
def make_single_row(rec: JobRecord) -> OutputRow:
"""Returns an OutputRow based on a single slurm job record."""
return OutputRow(
username=rec.username,
JobID=rec.raw.get("JobIDRaw", rec.raw.get("JobID", "")),
CPUs=rec.cpus,
Nodes=rec.nodes,
ReqMem=rec.reqmem_gb,
MemPerCPU=rec.mem_per_cpu_gb,
ReqWalltime=rec.reqwall_hours,
Count=1,
CPU_Efficiency=rec.cpu_eff,
Memory_Efficiency=rec.mem_eff,
Time_Efficiency=rec.time_eff,
jobname=rec.jobname,
_cpu_eff_values=[rec.cpu_eff] if rec.cpu_eff is not None else [],
_mem_eff_values=[rec.mem_eff] if rec.mem_eff is not None else [],
_time_eff_values=[rec.time_eff] if rec.time_eff is not None else [],
)
def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) -> OutputRow:
"""Returns an OutputRow based on the given list of job records."""
first = records[0]
cpu_eff_vals = [r.cpu_eff for r in records if r.cpu_eff is not None]
mem_eff_vals = [r.mem_eff for r in records if r.mem_eff is not None]
time_eff_vals = [r.time_eff for r in records if r.time_eff is not None]
return OutputRow(
username=username,
JobID="",
CPUs=first.cpus,
Nodes=first.nodes,
ReqMem=first.reqmem_gb,
MemPerCPU=first.mem_per_cpu_gb,
ReqWalltime=first.reqwall_hours,
Count=len(records),
CPU_Efficiency=mean_or_none(cpu_eff_vals),
Memory_Efficiency=mean_or_none(mem_eff_vals),
Time_Efficiency=mean_or_none(time_eff_vals),
jobname=jobname,
Testcolumn=10,
_cpu_eff_values=cpu_eff_vals,
_mem_eff_values=mem_eff_vals,
_time_eff_values=time_eff_vals,
)
def resolve_column_name(name: str) -> str:
"""Returns canonicalized full column name, accepts one letter column codes"""
n = name.strip()
reverse = {v.lower(): v for v in DEFAULT_COLUMNS}
reverse.update({v.lower(): v for v in NUMERIC_COLUMNS})
if n in ALIASES:
return ALIASES[n]
if n.lower() in reverse:
return reverse[n.lower()]
die(f"unknown column/alias: {name}")
return ""
def sort_rows(rows: list[OutputRow], spec: str | None) -> list[OutputRow]:
if not spec:
return rows
terms = [x.strip() for x in spec.split(",") if x.strip()]
if len(terms) > 3:
die("--sort supports at most three columns")
parsed: list[tuple[str, bool]] = []
for t in terms:
desc = t.startswith("-")
asc = t.startswith("+")
name = t[1:] if (desc or asc) else t
col = resolve_column_name(name)
parsed.append((col, desc))
sorted_rows = rows
# Stable-sort from least significant to most significant.
for col, desc in reversed(parsed):
if col in NUMERIC_COLUMNS:
sorted_rows = sorted(
sorted_rows,
key=lambda r: float("-inf") if getattr(r, col) is None else getattr(r, col),
reverse=desc,
)
else:
sorted_rows = sorted(
sorted_rows,
key=lambda r: "" if getattr(r, col) is None else str(getattr(r, col)),
reverse=desc,
)
return sorted_rows
# adds sdev columns behind their respective avg column
def columns_for_sdev(base_cols: list[str]) -> list[str]:
out: list[str] = []
for col in base_cols:
out.append(col)
if col in {"CPU_Efficiency", "Memory_Efficiency", "Time_Efficiency"}:
out += [f"{col}_sdev", f"{col}_max", f"{col}_min"]
return out
def format_reqmem_gb(value: float | None) -> str:
if value is None:
return "NA"
return f"{value:.2f}G"
def format_reqwall_hours(value: float | None) -> str:
if value is None:
return "NA"
return f"{value:.2f}h"
def format_value(value: Any, column: str | None = None) -> str:
if column == "ReqMem":
return format_reqmem_gb(value)
if column == "MemPerCPU":
return format_reqmem_gb(value)
if column == "ReqWalltime":
return format_reqwall_hours(value)
if value is None:
return "NA"
if isinstance(value, float):
return f"{value:.2f}"
return str(value)
def print_table(rows: list[OutputRow], columns: list[str], sdev: bool) -> None:
dicts = [r.as_dict(sdev=sdev) for r in rows]
columns = columns_for_sdev(columns) if sdev else columns
widths = {col: len(col) for col in columns}
for d in dicts:
for col in columns:
widths[col] = max(widths[col], len(format_value(d.get(col), col)))
print(" ".join(col.ljust(widths[col]) for col in columns))
print(" ".join("-" * widths[col] for col in columns))
for d in dicts:
print(" ".join(format_value(d.get(col), col).ljust(widths[col]) for col in columns))
FORMAT_RE = re.compile(r"%(?:([-.0-9]*)([A-Za-z]))")
def print_custom_format(rows: list[OutputRow], fmt: str, sdev: bool) -> None:
# Slurm-like mini-format: %.12u %.8e %.30j, where the final letter is an alias.
# For efficiency aliases with --sdev, only the averaged value is emitted here.
for row in rows:
d = row.as_dict(sdev=sdev)
def repl(match: re.Match[str]) -> str:
widthspec, alias = match.groups()
col = resolve_column_name(alias)
val = d.get(col)
text = format_value(val, col)
if widthspec:
# Interpret %.12s style as max width; %12s as min width.
m = re.fullmatch(r"\.?(-?\d+)", widthspec)
if m:
w = int(m.group(1))
if widthspec.startswith("."):
text = text[: abs(w)]
else:
text = text.rjust(w) if w > 0 else text.ljust(abs(w))
return text
print(FORMAT_RE.sub(repl, fmt))
def parse_args(argv: list[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Display seff-style CPU, memory and walltime efficiency values from sacct data."
)
p.add_argument("-S", "--start", help="sacct start time, passed to sacct -S",
default="now - 24 hours")
p.add_argument("-E", "--end", help="sacct end time, passed to sacct -E",
default="now")
p.add_argument("-u", "--user", help="restrict to one user; passed as sacct -u unless reading from cache")
p.add_argument("--state", "--job-state", dest="state",
default="COMPLETED",
help="sacct state filter, e.g. COMPLETED,FAILED,TIMEOUT")
p.add_argument("-O", "--output-cache", help="write raw sacct output cache to this file")
p.add_argument("-F", "--from-cache", help="read raw sacct output cache from this file instead of running sacct")
p.add_argument("-U", "--aggr-user", action="store_true", help="aggregate jobs by user, CPUs, nodes, ReqMem, and timelimit")
p.add_argument(
"-R",
"--aggr-regexp",
action="append",
help="aggregate jobs matching regexp by regexp, CPUs, nodes, ReqMem, and timelimit; may be repeated",
)
p.add_argument("--sdev", action="store_true", help="after each efficiency average, add sdev, max, and min columns")
p.add_argument("--json", action="store_true", help="emit JSON instead of an ASCII table")
p.add_argument("-s", "--sort", help="comma-separated numeric sort columns or aliases; prefix with - for descending")
p.add_argument(
"-o",
"--format",
help=(
"Slurm-like output format using aliases: "
"u=username,i=JobID,c=CPUs,N=Nodes,m=ReqMem,p=MemPerCPU,l=ReqWalltime,C=Count,"
"e=CPU_Efficiency,M=Memory_Efficiency,t=Time_Efficiency,j=jobname"
),
)
args = p.parse_args(argv)
if args.from_cache and any([args.start, args.end, args.state]):
print("warning: -S/-E/--state are ignored when using -F/--from-cache", file=sys.stderr)
if args.aggr_user and args.aggr_regexp:
die("choose only one aggregation mode: --aggr-user or --aggr-regexp")
return args
def main(argv: list[str] | None = None) -> int:
# die silently if pipe process dies before us
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
args = parse_args(argv or sys.argv[1:])
if args.from_cache:
rows = read_cache(args.from_cache)
else:
rows = run_sacct(args)
if args.output_cache:
write_cache(args.output_cache, rows)
records = build_job_records(rows, only_user=args.user)
out_rows = aggregate_records(records, args)
out_rows = sort_rows(out_rows, args.sort)
if args.aggr_user or args.aggr_regexp:
output_columns = [c for c in DEFAULT_COLUMNS if c != "JobID"]
else:
output_columns = DEFAULT_COLUMNS
if args.json:
print(json.dumps([r.as_dict(sdev=args.sdev) for r in out_rows], indent=2))
elif args.format:
print_custom_format(out_rows, args.format, args.sdev)
else:
print_table(out_rows, output_columns, args.sdev)
return 0
if __name__ == "__main__":
raise SystemExit(main())