commit c0d533fc76c555e4a5b1e14cf0c206c3eced3289 Author: Derek Feichtinger Date: Fri May 22 21:53:41 2026 +0200 initial version of slurm-eff-tool script produced by vibe coding of an exact functionality description using chatgpt-5.5 diff --git a/slurm-eff-tool.py b/slurm-eff-tool.py new file mode 100755 index 0000000..fa22a68 --- /dev/null +++ b/slurm-eff-tool.py @@ -0,0 +1,652 @@ +#!/usr/bin/env python3 +""" +slurm-eff-tool.py - Slurm job efficiency reporting and investigation tool. + +Examples: + ./slurm_eff.py --start 2026-05-01 --end 2026-05-22 -u dfeich + ./slurm_eff.py -S 2026-05-01 -E now -u dfeich -O sacct.cache + ./slurm_eff.py -F sacct.cache --aggr-user --sdev -s cpu,-mem,time + ./slurm_eff.py -F sacct.cache -R '^vasp','^gromacs' --json + ./slurm_eff.py -F sacct.cache -o "%.12u %c %N %m %.12l %C %.8e %.8M %.8t %.30j" + +Efficiency definitions: + CPU_Efficiency = TotalCPU_seconds / CPUTimeRAW * 100 + Memory_Efficiency = max(MaxRSS_bytes across non-extern job steps) / requested_memory_bytes * 100 + Time_Efficiency = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100 + +Memory efficiency intentionally uses the maximum MaxRSS seen across non-extern +Slurm job steps. This conceptually matches the practical "peak RSS" style of +seff, but it is not a perfect sum of all ranks. + +This script intentionally asks sacct for raw parsable fields and caches those rows. +""" +# initial version of script produced by vibe coding of an exact functionality +# description using chatgpt-5.5 +# 2026 D. Feichtinger + +from __future__ import annotations + +import argparse +import csv +import json +import math +import os +import re +import statistics +import subprocess +import sys +from collections import defaultdict +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable + + +SACCT_FIELDS = [ + "JobIDRaw", + "JobID", + "User", + "JobName", + "State", + "AllocCPUS", + "NNodes", + "ReqMem", + "ElapsedRaw", + "TimelimitRaw", + "CPUTimeRAW", + "TotalCPU", + "MaxRSS", +] + +DEFAULT_COLUMNS = [ + "username", + "CPUs", + "Nodes", + "ReqMem", + "ReqWalltime", + "Count", + "CPU_Efficiency", + "Memory_Efficiency", + "Time_Efficiency", + "jobname", +] + +# One-character aliases for sorting and output format specifications. +ALIASES = { + "u": "username", + "c": "CPUs", + "N": "Nodes", + "m": "ReqMem", + "l": "ReqWalltime", + "C": "Count", + "e": "CPU_Efficiency", + "M": "Memory_Efficiency", + "t": "Time_Efficiency", + "j": "jobname", +} + +NUMERIC_COLUMNS = { + "CPUs", + "Nodes", + "Count", + "CPU_Efficiency", + "Memory_Efficiency", + "Time_Efficiency", +} + + +@dataclass +class JobRecord: + raw: dict[str, str] + username: str + jobname: str + cpus: int + nodes: int + reqmem_raw: str + reqwall_raw: str + reqmem_bytes_total: float | None + elapsed_sec: int + timelimit_sec: int + totalcpu_sec: float + maxrss_bytes: float | None + cpu_eff: float | None + mem_eff: float | None + time_eff: float | None + + +@dataclass +class OutputRow: + username: str + CPUs: int + Nodes: int + ReqMem: str + ReqWalltime: str + Count: int + CPU_Efficiency: float | None + Memory_Efficiency: float | None + Time_Efficiency: float | None + jobname: str + _cpu_values: list[float] = field(default_factory=list, repr=False) + _mem_values: list[float] = field(default_factory=list, repr=False) + _time_values: list[float] = field(default_factory=list, repr=False) + + def as_dict(self, sdev: bool = False) -> dict[str, Any]: + d: dict[str, Any] = { + "username": self.username, + "CPUs": self.CPUs, + "Nodes": self.Nodes, + "ReqMem": self.ReqMem, + "ReqWalltime": self.ReqWalltime, + "Count": self.Count, + "CPU_Efficiency": self.CPU_Efficiency, + "Memory_Efficiency": self.Memory_Efficiency, + "Time_Efficiency": self.Time_Efficiency, + "jobname": self.jobname, + } + if sdev: + for col, vals in [ + ("CPU_Efficiency", self._cpu_values), + ("Memory_Efficiency", self._mem_values), + ("Time_Efficiency", self._time_values), + ]: + d[f"{col}_sdev"] = stdev_or_none(vals) + d[f"{col}_max"] = max(vals) if vals else None + d[f"{col}_min"] = min(vals) if vals else None + return d + + +def die(msg: str, code: int = 2) -> None: + print(f"error: {msg}", file=sys.stderr) + raise SystemExit(code) + + +def parse_duration_to_seconds(value: str) -> float: + """Parse Slurm-ish CPU time strings such as 01:02:03, 2-01:02:03, 5:12.345.""" + value = (value or "").strip() + if not value or value in {"Unknown", "UNLIMITED", "Partition_Limit"}: + return 0.0 + + days = 0 + if "-" in value: + d, value = value.split("-", 1) + days = int(d) + + parts = value.split(":") + try: + if len(parts) == 3: + h, m, s = parts + sec = float(s) + return days * 86400 + int(h) * 3600 + int(m) * 60 + sec + if len(parts) == 2: + m, s = parts + return days * 86400 + int(m) * 60 + float(s) + if len(parts) == 1: + return days * 86400 + float(parts[0]) + except ValueError: + return 0.0 + return 0.0 + + +def format_seconds(seconds: int | float | None) -> str: + if seconds is None or seconds <= 0: + return "Unknown" + seconds = int(round(seconds)) + days, rem = divmod(seconds, 86400) + hours, rem = divmod(rem, 3600) + minutes, sec = divmod(rem, 60) + if days: + return f"{days}-{hours:02d}:{minutes:02d}:{sec:02d}" + return f"{hours:02d}:{minutes:02d}:{sec:02d}" + + +def parse_size_to_bytes(value: str) -> float | None: + """Parse Slurm memory size fields such as 1024K, 2000M, 8Gn, 4Gc, 1.5T.""" + s = (value or "").strip() + if not s or s in {"Unknown", "0", "0K", "0M", "0G", "0T"}: + return None + + # Slurm ReqMem may end in c/n for per-CPU or per-node. Strip that here. + if s[-1].lower() in {"c", "n"}: + s = s[:-1] + + m = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)([KMGTP]?)", s, flags=re.I) + if not m: + return None + + num = float(m.group(1)) + unit = m.group(2).upper() or "K" # Slurm memory fields are normally KiB when unitless. + mult = { + "K": 1024, + "M": 1024**2, + "G": 1024**3, + "T": 1024**4, + "P": 1024**5, + }[unit] + return num * mult + + +def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None: + """Convert ReqMem into total requested bytes for the whole job allocation.""" + raw = (reqmem or "").strip() + base = parse_size_to_bytes(raw) + if base is None: + return None + if raw.lower().endswith("c"): + return base * max(cpus, 1) + # Default Slurm ReqMem suffix is usually n: memory per node. + return base * max(nodes, 1) + + +def common_prefix(names: list[str]) -> str: + if not names: + return "" + prefix = os.path.commonprefix(names) + # Avoid ugly partial-token prefixes when possible. + prefix = re.sub(r"[^A-Za-z0-9_.-]+$", "", prefix) + return prefix or names[0] + + +def stdev_or_none(values: list[float]) -> float | None: + if len(values) < 2: + return 0.0 if len(values) == 1 else None + return statistics.stdev(values) + + +def mean_or_none(values: list[float]) -> float | None: + return statistics.mean(values) if values else None + + +def pct(numerator: float | None, denominator: float | None) -> float | None: + if numerator is None or denominator is None or denominator <= 0: + return None + return 100.0 * numerator / denominator + + +def run_sacct(args: argparse.Namespace) -> list[dict[str, str]]: + cmd = [ + "sacct", + "-P", + "-n", + "--units=K", + "--format=" + ",".join(SACCT_FIELDS), + ] + + if args.start: + cmd += ["-S", args.start] + if args.end: + cmd += ["-E", args.end] + if args.user: + cmd += ["-u", args.user] + if args.state: + cmd += ["--state", args.state] + + # Include job steps because MaxRSS often lives on batch/extern/step rows. + # We later collapse rows back to the base job ID. + try: + proc = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except FileNotFoundError: + die("sacct not found in PATH") + except subprocess.CalledProcessError as exc: + die(f"sacct failed with exit code {exc.returncode}:\n{exc.stderr.strip()}") + + rows = parse_pipe_rows(proc.stdout.splitlines()) + return rows + + +def parse_pipe_rows(lines: Iterable[str]) -> list[dict[str, str]]: + rows: list[dict[str, str]] = [] + reader = csv.reader(lines, delimiter="|") + for parts in reader: + if not parts: + continue + parts = [p.strip() for p in parts] + if len(parts) < len(SACCT_FIELDS): + parts += [""] * (len(SACCT_FIELDS) - len(parts)) + rows.append(dict(zip(SACCT_FIELDS, parts[: len(SACCT_FIELDS)]))) + return rows + + +def write_cache(path: str, rows: list[dict[str, str]]) -> None: + with open(path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=SACCT_FIELDS, delimiter="|", lineterminator="\n") + writer.writeheader() + writer.writerows(rows) + + +def read_cache(path: str) -> list[dict[str, str]]: + with open(path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f, delimiter="|") + missing = [x for x in SACCT_FIELDS if x not in (reader.fieldnames or [])] + # Older cache files from versions before CPUTimeRAW can still be read. + # CPU efficiency will be NA for those rows unless CPUTimeRAW is present. + missing_required = [x for x in missing if x != "CPUTimeRAW"] + if missing_required: + die(f"cache file is missing expected fields: {', '.join(missing_required)}") + return [{k: (row.get(k) or "").strip() for k in SACCT_FIELDS} for row in reader] + + +def base_job_id(jobid: str) -> str: + return re.split(r"[._]", jobid, maxsplit=1)[0] + + +def is_top_level_row(row: dict[str, str]) -> bool: + jid = row.get("JobID", "") + return "." not in jid and "_" not in jid + + +def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) -> list[JobRecord]: + """Collapse sacct top-level and step rows into one JobRecord per base job.""" + grouped: dict[str, list[dict[str, str]]] = defaultdict(list) + for row in rows: + # Do not filter by User here. On many Slurm installations, job step rows + # are where MaxRSS is populated, but their User field may be empty or may + # not match the parent job's user. Filtering before grouping drops those + # rows and makes Memory_Efficiency become NA. Filter after identifying + # the top-level job row instead. + grouped[base_job_id(row.get("JobIDRaw") or row.get("JobID", ""))].append(row) + + records: list[JobRecord] = [] + for _, group in grouped.items(): + top = next((r for r in group if is_top_level_row(r)), group[0]) + if only_user and top.get("User") != only_user: + continue + + # seff-style practical peak RSS: maximum MaxRSS across non-extern job steps. + # This is intentionally not a sum over all ranks/tasks. + step_rows = [ + r for r in group + if "." in (r.get("JobID") or r.get("JobIDRaw") or "") + and ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "") + ] + rss_source_rows = step_rows or [ + r for r in group + if ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "") + ] + maxrss_values = [parse_size_to_bytes(r.get("MaxRSS", "")) for r in rss_source_rows] + maxrss = max([x for x in maxrss_values if x is not None], default=None) + + cpus = int(float(top.get("AllocCPUS") or 0)) + nodes = int(float(top.get("NNodes") or 0)) + elapsed = int(float(top.get("ElapsedRaw") or 0)) + + # sacct TimelimitRaw is minutes, while ElapsedRaw and CPUTimeRAW are seconds. + timelimit_raw_minutes = float(top.get("TimelimitRaw") or 0) + timelimit_seconds = int(round(timelimit_raw_minutes * 60)) + + cputime_raw = float(top.get("CPUTimeRAW") or 0) + totalcpu = parse_duration_to_seconds(top.get("TotalCPU", "")) + + reqmem = top.get("ReqMem", "") + reqmem_total = reqmem_total_bytes(reqmem, cpus, nodes) + + cpu_eff = pct(totalcpu, cputime_raw) + mem_eff = pct(maxrss, reqmem_total) + time_eff = pct(elapsed, timelimit_seconds) + + records.append( + JobRecord( + raw=top, + username=top.get("User", ""), + jobname=top.get("JobName", ""), + cpus=cpus, + nodes=nodes, + reqmem_raw=reqmem, + reqwall_raw=format_seconds(timelimit_seconds), + reqmem_bytes_total=reqmem_total, + elapsed_sec=elapsed, + timelimit_sec=timelimit_seconds, + totalcpu_sec=totalcpu, + maxrss_bytes=maxrss, + cpu_eff=cpu_eff, + mem_eff=mem_eff, + time_eff=time_eff, + ) + ) + + return records + + +def aggregate_records(records: list[JobRecord], args: argparse.Namespace) -> list[OutputRow]: + if args.aggr_regexp: + compiled = [(pat, re.compile(pat)) for pat in args.aggr_regexp] + buckets: dict[tuple[Any, ...], list[JobRecord]] = defaultdict(list) + unmatched: list[JobRecord] = [] + + for rec in records: + matched = False + for pat, rx in compiled: + if rx.search(rec.jobname): + key = (pat, rec.cpus, rec.nodes, rec.reqmem_raw, rec.reqwall_raw) + buckets[key].append(rec) + matched = True + break + if not matched: + unmatched.append(rec) + + out = [make_aggregate_row(v, username="*", jobname=k[0]) for k, v in buckets.items()] + out.extend(make_single_row(r) for r in unmatched) + return out + + if args.aggr_user: + buckets = defaultdict(list) + for rec in records: + key = (rec.username, rec.cpus, rec.nodes, rec.reqmem_raw, rec.reqwall_raw) + buckets[key].append(rec) + return [make_aggregate_row(v, username=k[0], jobname=f"{common_prefix([r.jobname for r in v])}*") for k, v in buckets.items()] + + return [make_single_row(r) for r in records] + + +def make_single_row(rec: JobRecord) -> OutputRow: + return OutputRow( + username=rec.username, + CPUs=rec.cpus, + Nodes=rec.nodes, + ReqMem=rec.reqmem_raw, + ReqWalltime=rec.reqwall_raw, + Count=1, + CPU_Efficiency=rec.cpu_eff, + Memory_Efficiency=rec.mem_eff, + Time_Efficiency=rec.time_eff, + jobname=rec.jobname, + _cpu_values=[rec.cpu_eff] if rec.cpu_eff is not None else [], + _mem_values=[rec.mem_eff] if rec.mem_eff is not None else [], + _time_values=[rec.time_eff] if rec.time_eff is not None else [], + ) + + +def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) -> OutputRow: + first = records[0] + cpu_vals = [r.cpu_eff for r in records if r.cpu_eff is not None] + mem_vals = [r.mem_eff for r in records if r.mem_eff is not None] + time_vals = [r.time_eff for r in records if r.time_eff is not None] + return OutputRow( + username=username, + CPUs=first.cpus, + Nodes=first.nodes, + ReqMem=first.reqmem_raw, + ReqWalltime=first.reqwall_raw, + Count=len(records), + CPU_Efficiency=mean_or_none(cpu_vals), + Memory_Efficiency=mean_or_none(mem_vals), + Time_Efficiency=mean_or_none(time_vals), + jobname=jobname, + _cpu_values=cpu_vals, + _mem_values=mem_vals, + _time_values=time_vals, + ) + + +def resolve_column(name: str) -> str: + n = name.strip() + reverse = {v.lower(): v for v in DEFAULT_COLUMNS} + reverse.update({v.lower(): v for v in NUMERIC_COLUMNS}) + if n in ALIASES: + return ALIASES[n] + if n.lower() in reverse: + return reverse[n.lower()] + die(f"unknown column/alias: {name}") + return "" + + +def sort_rows(rows: list[OutputRow], spec: str | None) -> list[OutputRow]: + if not spec: + return rows + + terms = [x.strip() for x in spec.split(",") if x.strip()] + if len(terms) > 3: + die("--sort supports at most three columns") + + parsed: list[tuple[str, bool]] = [] + for t in terms: + desc = t.startswith("-") + asc = t.startswith("+") + name = t[1:] if (desc or asc) else t + col = resolve_column(name) + if col not in NUMERIC_COLUMNS: + die(f"--sort column must be numeric, got {col}") + parsed.append((col, desc)) + + sorted_rows = rows + # Stable-sort from least significant to most significant. + for col, desc in reversed(parsed): + sorted_rows = sorted( + sorted_rows, + key=lambda r: float("-inf") if getattr(r, col) is None else getattr(r, col), + reverse=desc, + ) + return sorted_rows + + +def columns_for_sdev(base_cols: list[str]) -> list[str]: + out: list[str] = [] + for col in base_cols: + out.append(col) + if col in {"CPU_Efficiency", "Memory_Efficiency", "Time_Efficiency"}: + out += [f"{col}_sdev", f"{col}_max", f"{col}_min"] + return out + + +def format_value(value: Any) -> str: + if value is None: + return "NA" + if isinstance(value, float): + return f"{value:.2f}" + return str(value) + + +def print_table(rows: list[OutputRow], columns: list[str], sdev: bool) -> None: + dicts = [r.as_dict(sdev=sdev) for r in rows] + columns = columns_for_sdev(columns) if sdev else columns + + widths = {col: len(col) for col in columns} + for d in dicts: + for col in columns: + widths[col] = max(widths[col], len(format_value(d.get(col)))) + + print(" ".join(col.ljust(widths[col]) for col in columns)) + print(" ".join("-" * widths[col] for col in columns)) + for d in dicts: + print(" ".join(format_value(d.get(col)).ljust(widths[col]) for col in columns)) + + +FORMAT_RE = re.compile(r"%(?:([-.0-9]*)([A-Za-z]))") + + +def print_custom_format(rows: list[OutputRow], fmt: str, sdev: bool) -> None: + # Slurm-like mini-format: %.12u %.8e %.30j, where the final letter is an alias. + # For efficiency aliases with --sdev, only the averaged value is emitted here. + for row in rows: + d = row.as_dict(sdev=sdev) + + def repl(match: re.Match[str]) -> str: + widthspec, alias = match.groups() + col = resolve_column(alias) + val = d.get(col) + text = format_value(val) + if widthspec: + # Interpret %.12s style as max width; %12s as min width. + m = re.fullmatch(r"\.?(-?\d+)", widthspec) + if m: + w = int(m.group(1)) + if widthspec.startswith("."): + text = text[: abs(w)] + else: + text = text.rjust(w) if w > 0 else text.ljust(abs(w)) + return text + + print(FORMAT_RE.sub(repl, fmt)) + + +def parse_args(argv: list[str]) -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Display seff-style CPU, memory and walltime efficiency values from sacct data." + ) + + p.add_argument("-S", "--start", help="sacct start time, passed to sacct -S", + default="now - 24 hours") + p.add_argument("-E", "--end", help="sacct end time, passed to sacct -E", + default="now") + p.add_argument("-u", "--user", help="restrict to one user; passed as sacct -u unless reading from cache") + p.add_argument("--state", "--job-state", dest="state", + default="COMPLETED", + help="sacct state filter, e.g. COMPLETED,FAILED,TIMEOUT") + + p.add_argument("-O", "--output-cache", help="write raw sacct output cache to this file") + p.add_argument("-F", "--from-cache", help="read raw sacct output cache from this file instead of running sacct") + + p.add_argument("-U", "--aggr-user", action="store_true", help="aggregate jobs by user, CPUs, nodes, ReqMem, and timelimit") + p.add_argument( + "-R", + "--aggr-regexp", + action="append", + help="aggregate jobs matching regexp by regexp, CPUs, nodes, ReqMem, and timelimit; may be repeated", + ) + + p.add_argument("--sdev", action="store_true", help="after each efficiency average, add sdev, max, and min columns") + p.add_argument("--json", action="store_true", help="emit JSON instead of an ASCII table") + p.add_argument("-s", "--sort", help="comma-separated numeric sort columns or aliases; prefix with - for descending") + p.add_argument( + "-o", + "--format", + help=( + "Slurm-like output format using aliases: " + "u=username,c=CPUs,N=Nodes,m=ReqMem,l=ReqWalltime,C=Count," + "e=CPU_Efficiency,M=Memory_Efficiency,t=Time_Efficiency,j=jobname" + ), + ) + + args = p.parse_args(argv) + + if args.from_cache and any([args.start, args.end, args.state]): + print("warning: -S/-E/--state are ignored when using -F/--from-cache", file=sys.stderr) + if args.aggr_user and args.aggr_regexp: + die("choose only one aggregation mode: --aggr-user or --aggr-regexp") + return args + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv or sys.argv[1:]) + + if args.from_cache: + rows = read_cache(args.from_cache) + else: + rows = run_sacct(args) + if args.output_cache: + write_cache(args.output_cache, rows) + + records = build_job_records(rows, only_user=args.user) + out_rows = aggregate_records(records, args) + out_rows = sort_rows(out_rows, args.sort) + + if args.json: + print(json.dumps([r.as_dict(sdev=args.sdev) for r in out_rows], indent=2)) + elif args.format: + print_custom_format(out_rows, args.format, args.sdev) + else: + print_table(out_rows, DEFAULT_COLUMNS, args.sdev) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())