Files
slurm-efficiency-tools/slurm-eff-tool.py
feichtinger 362e4d46c9 TRES parsing for getting Req, Alloc, Used TRES
Introduced elegant dataclasses, but could have come at the cost
of speed.
2026-06-05 17:18:29 +02:00

921 lines
30 KiB
Python
Executable File

#!/usr/bin/env python3
"""
slurm-eff-tool.py - Slurm job efficiency reporting and investigation tool.
Efficiency definitions:
CPU_Eff = TotalCPU_seconds / CPUTimeRAW * 100
Time_Eff = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100
Mem_Eff = 100 (slurm recorded mem usage) / (mem allocated by system)
This script intentionally asks sacct for raw parsable fields and caches those rows.
TODO: allow to cache a faster already parsed and binary format. Keep sacct text option for
debug option
"""
# initial version of script produced by vibe coding of an exact functionality
# description using chatgpt-5.5
# 2026 D. Feichtinger <derek.feichtinger@psi.ch>
from __future__ import annotations
import argparse
import csv
import json
import math
import os
import re
import statistics
import subprocess
import sys
import signal
from collections import defaultdict
from dataclasses import dataclass, field, fields
from pathlib import Path
from typing import Any, Iterable
SACCT_FIELDS = [
"JobIDRaw",
"JobID",
"User",
"JobName",
"State",
"AllocCPUS",
"NNodes",
"ReqMem",
"ElapsedRaw",
"TimelimitRaw",
"CPUTimeRAW",
"TotalCPU",
"MaxRSS",
"ReqTRes",
"AllocTRES",
"TRESUsageInTot",
]
# Default columns to print in output, and their order
DEFAULT_COLUMNS = [
"username",
"JobID",
"Count",
"NTasks",
"CPUs",
"Nodes",
"CPU_Eff",
"waste_CPU",
"ReqMem",
"AllocMem",
"UsedMem",
"Mem_Eff",
"waste_Mem",
"MaxRSS_max",
"MemPerCPU",
"ReqWalltime",
"Walltime",
"Walltime_max",
"Time_Eff",
"jobname",
]
# One-character aliases for sorting and output format specifications.
ALIASES = {
"u": "username",
"i": "JobID",
"c": "CPUs",
"N": "Nodes",
"m": "ReqMem",
"n": "UsedMem",
"p": "MemPerCPU",
"l": "ReqWalltime",
"C": "Count",
"e": "CPU_Eff",
"M": "Mem_Eff",
"t": "Time_Eff",
"j": "jobname",
"X": "waste_CPU",
"Y": "waste_Mem",
}
NUMERIC_COLUMNS = {
"CPUs",
"NTasks",
"Nodes",
"ReqMem",
"AllocMem",
"UsedMem",
"MemPerCPU",
"ReqWalltime",
"Count",
"CPU_Eff",
"MaxRSS_max",
"Mem_Eff",
"Time_Eff",
"Walltime_max",
"waste_CPU",
"waste_Mem",
}
@dataclass
class JobRecord:
raw: dict[str, str]
username: str
jobname: str
reqtasks: int
cpus: int
nodes: int
reqmem_gb: float | None
mem_per_cpu_gb: float | None
mem_alloc_tres: float | None
mem_used_tres: float | None
reqwall_hours: float | None
reqmem_bytes_total: float | None
elapsed_sec: int
timelimit_sec: int
totalcpu_sec: float
maxrss_bytes: float | None
cpu_eff: float | None
mem_eff: float | None
time_eff: float | None
@dataclass
class AllocTresStruct:
mem: int | None = None
cpu: int = 0
def __post_init__(self):
if isinstance(self.mem, str):
self.mem = parse_size_to_bytes(self.mem)
if isinstance(self.cpu, str):
self.cpu = int(self.cpu)
@classmethod
def from_tres_string(cls,tres_str: str) -> AllocTresStruct:
"""Parse comma separated fields of a TRES string."""
if tres_str == "":
return AllocTresStruct()
supported_fields = {f.name for f in fields(AllocTresStruct)}
result = {}
for f in tres_str.split(","):
key,val = f.split("=", maxsplit=1)
result[key] = val
values = {k: v for k, v in result.items() if k in supported_fields}
return cls(**values)
@dataclass
class UsedTresStruct:
mem: int | None = None
cpu: str = ""
def __post_init__(self):
if isinstance(self.mem, str):
self.mem = parse_size_to_bytes(self.mem)
@classmethod
def from_tres_string(cls,tres_str: str) -> UsedTresStruct:
"""Parse comma separated fields of a TRESUsageInTot string."""
if tres_str == "":
return UsedTresStruct()
supported_fields = {f.name for f in fields(UsedTresStruct)}
result = {}
for f in tres_str.split(","):
key,val = f.split("=", maxsplit=1)
result[key] = val
values = {k: v for k, v in result.items() if k in supported_fields}
return cls(**values)
@dataclass
class OutputRow:
username: str
JobID: str
NTasks: int
CPUs: int
Nodes: int
ReqMem: float | None
AllocMem: float | None
UsedMem: float | None
MemPerCPU: float | None
ReqWalltime: float | None
Count: int
CPU_Eff: float | None
Mem_Eff: float | None
Time_Eff: float | None
jobname: str
maxrss_max: float | None
walltime: float | None # in h
walltime_max: float | None # in h
waste_CPU: float | None # CPU h
waste_Mem: float | None
_cpu_eff_values: list[float] = field(default_factory=list, repr=False)
_mem_eff_values: list[float] = field(default_factory=list, repr=False)
_time_eff_values: list[float] = field(default_factory=list, repr=False)
def as_dict(self, sdev: bool = False) -> dict[str, Any]:
d: dict[str, Any] = {
"username": self.username,
"JobID": self.JobID,
"NTasks": self.NTasks,
"CPUs": self.CPUs,
"Nodes": self.Nodes,
"ReqMem": self.ReqMem,
"AllocMem": self.AllocMem,
"UsedMem": self.UsedMem,
"MaxRSS_max": self.maxrss_max,
"MemPerCPU": self.MemPerCPU,
"ReqWalltime": self.ReqWalltime,
"Walltime": self.walltime,
"Walltime_max": self.walltime_max,
"Count": self.Count,
"CPU_Eff": self.CPU_Eff,
"Mem_Eff": self.Mem_Eff,
"Time_Eff": self.Time_Eff,
"jobname": self.jobname,
"waste_CPU": self.waste_CPU,
"waste_Mem": self.waste_Mem,
}
if sdev:
for col, vals in [
("CPU_Eff", self._cpu_eff_values),
("Mem_Eff", self._mem_eff_values),
("Time_Eff", self._time_eff_values),
]:
d[f"{col}_sdev"] = stdev_or_none(vals)
d[f"{col}_max"] = max(vals) if vals else None
d[f"{col}_min"] = min(vals) if vals else None
return d
def die(msg: str, code: int = 2) -> None:
print(f"error: {msg}", file=sys.stderr)
raise SystemExit(code)
def slurm_duration_to_seconds(value: str) -> float:
"""Parse Slurm-ish CPU time strings such as 01:02:03, 2-01:02:03, 5:12.345."""
value = (value or "").strip()
if not value or value in {"Unknown", "UNLIMITED", "Partition_Limit"}:
return 0.0
days = 0
if "-" in value:
d, value = value.split("-", 1)
days = int(d)
parts = value.split(":")
try:
if len(parts) == 3:
h, m, s = parts
sec = float(s)
return days * 86400 + int(h) * 3600 + int(m) * 60 + sec
if len(parts) == 2:
m, s = parts
return days * 86400 + int(m) * 60 + float(s)
if len(parts) == 1:
return days * 86400 + float(parts[0])
except ValueError:
return 0.0
return 0.0
def format_seconds(seconds: int | float | None) -> str:
if seconds is None or seconds <= 0:
return "Unknown"
seconds = int(round(seconds))
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, sec = divmod(rem, 60)
if days:
return f"{days}-{hours:02d}:{minutes:02d}:{sec:02d}"
return f"{hours:02d}:{minutes:02d}:{sec:02d}"
def parse_size_to_bytes(value: str) -> int | None:
"""Parse Slurm memory size fields such as 1024K, 2000M, 8Gn, 4Gc, 1.5T."""
s = (value or "").strip()
if not s or s in {"Unknown", "0", "0K", "0M", "0G", "0T"}:
return None
# Slurm ReqMem may end in c/n for per-CPU or per-node. Strip that here.
if s[-1].lower() in {"c", "n"}:
s = s[:-1]
m = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)([KMGTP]?)", s, flags=re.I)
if not m:
return None
num = int(m.group(1))
unit = m.group(2).upper() or "K" # Slurm memory fields are normally KiB when unitless.
mult = {
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}[unit]
return num * mult
def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None:
"""Convert ReqMem into total requested bytes for the whole job allocation."""
raw = (reqmem or "").strip()
base = parse_size_to_bytes(raw)
if base is None:
return None
if raw.lower().endswith("c"):
return base * max(cpus, 1)
if raw.lower().endswith("n"):
return base * max(nodes, 1)
# Default Slurm ReqMem suffix is usually n: memory per node.
return base
def common_prefix(names: list[str]) -> str:
if not names:
return ""
prefix = os.path.commonprefix(names)
# Avoid ugly partial-token prefixes when possible.
prefix = re.sub(r"[^A-Za-z0-9_.-]+$", "", prefix)
if len(names) == 1:
return names[0]
namepattern = "*"
if prefix:
namepattern = f"{prefix}*"
return namepattern
def stdev_or_none(values: list[float]) -> float | None:
if len(values) < 2:
return 0.0 if len(values) == 1 else None
return statistics.stdev(values)
def mean_or_none(values: list[float]) -> float | None:
return statistics.mean(values) if values else None
def pct(numerator: float | None, denominator: float | None) -> float | None:
if numerator is None or denominator is None or denominator <= 0:
return None
return 100.0 * numerator / denominator
def run_sacct(args: argparse.Namespace) -> list[dict[str, str]]:
cmd = [
"sacct",
"-P",
"-n",
"--units=K",
"--format=" + ",".join(SACCT_FIELDS),
]
if args.start:
cmd += ["-S", args.start]
if args.end:
cmd += ["-E", args.end]
if args.user:
cmd += ["-u", args.user]
if args.state:
cmd += ["--state", args.state]
# Include job steps because MaxRSS often lives on batch/extern/step rows.
# We later collapse rows back to the base job ID.
try:
proc = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
die("sacct not found in PATH")
except subprocess.CalledProcessError as exc:
die(f"sacct failed with exit code {exc.returncode}:\n{exc.stderr.strip()}")
rows = parse_pipe_rows(proc.stdout.splitlines())
return rows
def parse_pipe_rows(lines: Iterable[str]) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
reader = csv.reader(lines, delimiter="|")
for parts in reader:
if not parts:
continue
parts = [p.strip() for p in parts]
if len(parts) < len(SACCT_FIELDS):
parts += [""] * (len(SACCT_FIELDS) - len(parts))
rows.append(dict(zip(SACCT_FIELDS, parts[: len(SACCT_FIELDS)])))
return rows
def write_cache(path: str, rows: list[dict[str, str]]) -> None:
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=SACCT_FIELDS, delimiter="|", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
def read_cache(path: str) -> list[dict[str, str]]:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="|")
missing = [x for x in SACCT_FIELDS if x not in (reader.fieldnames or [])]
# Older cache files from versions before CPUTimeRAW can still be read.
# CPU efficiency will be NA for those rows unless CPUTimeRAW is present.
missing_required = [x for x in missing if x != "CPUTimeRAW"]
if missing_required:
die(f"cache file is missing expected fields: {', '.join(missing_required)}")
return [{k: (row.get(k) or "").strip() for k in SACCT_FIELDS} for row in reader]
def base_job_id(jobid: str) -> str:
return re.split(r"[._]", jobid, maxsplit=1)[0]
def is_top_level_row(row: dict[str, str]) -> bool:
jid = row.get("JobID", "")
return "." not in jid and "_" not in jid
def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) -> list[JobRecord]:
"""Collapse sacct top-level and step rows into one JobRecord per base job."""
grouped: dict[str, list[dict[str, str]]] = defaultdict(list)
for row in rows:
# Do not filter by User here. On many Slurm installations, job step rows
# are where MaxRSS is populated, but their User field may be empty or may
# not match the parent job's user. Filtering before grouping drops those
# rows and makes Mem_Eff become NA. Filter after identifying
# the top-level job row instead.
grouped[base_job_id(row.get("JobIDRaw") or row.get("JobID", ""))].append(row)
records: list[JobRecord] = []
# Each group consists of sacct rows belonging to a job
for _, group in grouped.items():
top = next((r for r in group if is_top_level_row(r)), group[0])
if only_user and top.get("User") != only_user:
continue
# seff-style practical peak RSS: maximum MaxRSS across non-extern job steps.
# This is intentionally not a sum over all ranks/tasks.
step_rows = [
r for r in group
if "." in (r.get("JobID") or r.get("JobIDRaw") or "")
and ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
]
rss_source_rows = step_rows or [
r for r in group
if ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
]
maxrss_values = [parse_size_to_bytes(r.get("MaxRSS", "")) for r in rss_source_rows]
# stores the largest maxrss value over all job steps
maxrss_cleaned = [x for x in maxrss_values if x is not None]
maxrss = max(maxrss_cleaned, default=None)
# find mem_used_tres over reported job steps. It may be in the
# *.interactive or *.0 steps
used_tres = [UsedTresStruct.from_tres_string(r.get("TRESUsageInTot","")) \
for r in rss_source_rows]
mem_used_tres = max(r.mem for r in used_tres if r.mem is not None)
mem_used_tres_gb = None
if mem_used_tres is not None:
mem_used_tres_gb = mem_used_tres / (1024**3)
req_tres = AllocTresStruct.from_tres_string(top.get("ReqTRes") or "")
# better than getting NTasks and then filtering out .extern and
# getting max over job steps
req_ntasks = req_tres.cpu
alloc_cpus = int(float(top.get("AllocCPUS") or 0))
nodes = int(float(top.get("NNodes") or 0))
elapsed = int(float(top.get("ElapsedRaw") or 0))
# sacct TimelimitRaw is minutes, while ElapsedRaw and CPUTimeRAW are seconds.
timelimit_raw_minutes = float(top.get("TimelimitRaw") or 0)
timelimit_seconds = int(round(timelimit_raw_minutes * 60))
cputime_raw = float(top.get("CPUTimeRAW") or 0)
totalcpu = slurm_duration_to_seconds(top.get("TotalCPU", ""))
# Memory requested in total by the user
# It is safer to rely on ReqTres.mem, instead constructing from ReqMem
# reqmem = top.get("ReqMem", "")
# reqmem_total = reqmem_total_bytes(reqmem, alloc_cpus, nodes)
reqmem_total = req_tres.mem
alloc_tres = AllocTresStruct.from_tres_string(top.get("AllocTRES") or "")
# Memory allocated by the scheduler, recorded in AllocTRES string
# mem_alloc_tres = parse_tres_mem_bytes(top.get("AllocTRES") or "")
mem_alloc_tres_gb = None
if alloc_tres.mem is not None:
mem_alloc_tres_gb = alloc_tres.mem / (1024**3)
reqmem_gb = None
if reqmem_total is not None:
reqmem_gb = reqmem_total / (1024**3)
reqwall_hours = timelimit_seconds / 3600.0 if timelimit_seconds > 0 else None
mem_per_cpu_gb = mem_alloc_tres_gb / alloc_cpus if reqmem_gb is not None \
and mem_alloc_tres_gb is not None and alloc_cpus > 0 else None
# EFFFICIENCY CALCULATIONS
#
# TODO: find out why cpu_eff for some jobs is > 100%. Probably this is
# related to 2 hyperthreads, and cputime_raw should have been doubled
# for these cases
cpu_eff = pct(totalcpu, cputime_raw)
time_eff = pct(elapsed, timelimit_seconds)
# Old metric:
# mem_eff = (Peak RSS of all job steps) / (user requested memory)
# mem_eff = pct(maxrss, reqmem_total)
# Better metric:
# mem_eff = (sum of peaks over all job steps) / (mem allocated by system)
mem_eff = pct(mem_used_tres, alloc_tres.mem)
records.append(
JobRecord(
raw=top,
username=top.get("User", ""),
jobname=top.get("JobName", ""),
reqtasks=req_ntasks,
cpus=alloc_cpus,
nodes=nodes,
reqmem_gb=reqmem_gb,
mem_used_tres=mem_used_tres_gb,
mem_per_cpu_gb=mem_per_cpu_gb,
mem_alloc_tres=mem_alloc_tres_gb,
reqwall_hours=reqwall_hours,
reqmem_bytes_total=reqmem_total,
elapsed_sec=elapsed,
timelimit_sec=timelimit_seconds,
totalcpu_sec=totalcpu,
maxrss_bytes=maxrss,
cpu_eff=cpu_eff,
mem_eff=mem_eff,
time_eff=time_eff,
)
)
return records
def aggregate_records(records: list[JobRecord], args: argparse.Namespace) -> list[OutputRow]:
"""Aggregate records according to given grouping instructions."""
if args.aggr_regexp:
compiled = [(pat, re.compile(pat)) for pat in args.aggr_regexp]
buckets: dict[tuple[Any, ...], list[JobRecord]] = defaultdict(list)
unmatched: list[JobRecord] = []
for rec in records:
matched = False
for pat, rx in compiled:
if rx.search(rec.jobname):
key = (pat, rec.username, rec.reqtasks, rec.cpus, rec.nodes,
rec.reqmem_gb, rec.reqwall_hours)
buckets[key].append(rec)
matched = True
break
if not matched:
unmatched.append(rec)
out = [make_aggregate_row(v, username=k[1], jobname=k[0]) for k, v in buckets.items()]
out.extend(make_single_row(r) for r in unmatched)
return out
if args.aggr_user:
buckets = defaultdict(list)
for rec in records:
key = (rec.username, rec.reqtasks, rec.cpus, rec.nodes,
rec.reqmem_gb, rec.reqwall_hours)
buckets[key].append(rec)
return [make_aggregate_row(v, username=k[0], jobname=f"{common_prefix([r.jobname for r in v])}") for k, v in buckets.items()]
return [make_single_row(r) for r in records]
def make_single_row(rec: JobRecord) -> OutputRow:
"""Returns an OutputRow based on a single slurm job record."""
walltime = rec.elapsed_sec / 3600
waste_mem = None
if rec.mem_eff is not None and rec.reqmem_gb is not None:
waste_mem = walltime * (100-rec.mem_eff)/100 * rec.reqmem_gb
waste_cpu=None
if rec.cpu_eff is not None:
waste_cpu = walltime * (100-rec.cpu_eff) * rec.cpus
return OutputRow(
username=rec.username,
JobID=rec.raw.get("JobIDRaw", rec.raw.get("JobID", "")),
NTasks=rec.reqtasks,
CPUs=rec.cpus,
Nodes=rec.nodes,
ReqMem=rec.reqmem_gb,
AllocMem=rec.mem_alloc_tres,
UsedMem=rec.mem_used_tres,
MemPerCPU=rec.mem_per_cpu_gb,
ReqWalltime=rec.reqwall_hours,
Count=1,
CPU_Eff=rec.cpu_eff,
Mem_Eff=rec.mem_eff,
Time_Eff=rec.time_eff,
jobname=rec.jobname,
maxrss_max=rec.maxrss_bytes / (1024**3) if rec.maxrss_bytes is not None else None,
walltime=walltime,
walltime_max= walltime,
waste_Mem=waste_mem,
waste_CPU=waste_cpu,
_cpu_eff_values=[rec.cpu_eff] if rec.cpu_eff is not None else [],
_mem_eff_values=[rec.mem_eff] if rec.mem_eff is not None else [],
_time_eff_values=[rec.time_eff] if rec.time_eff is not None else [],
)
def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) -> OutputRow:
"""Returns an OutputRow based on the given list of job records."""
first = records[0]
cpu_eff_vals = [r.cpu_eff for r in records if r.cpu_eff is not None]
mem_eff_vals = [r.mem_eff for r in records if r.mem_eff is not None]
time_eff_vals = [r.time_eff for r in records if r.time_eff is not None]
walltime=mean_or_none([r.elapsed_sec for r in records if r.elapsed_sec is not None])
if walltime is not None:
walltime /= 3600
# We average over all jobs' allocated memory. Some jobs could have received
# different allocations, even though all of them had the same user required
# Memory. Maybe should generate a warning. Most of the time, what the user
# requested should match what the scheduler gave
alloc_mem = mean_or_none([r.mem_alloc_tres for r in records if r.mem_alloc_tres is not None])
used_mem = mean_or_none([r.mem_used_tres for r in records if r.mem_used_tres is not None])
memory_efficiency=mean_or_none(mem_eff_vals)
count=len(records)
waste_mem = None
if (memory_efficiency is not None) and (walltime is not None) \
and first.reqmem_gb is not None:
waste_mem = count * walltime * (100-memory_efficiency)/100 * first.reqmem_gb
cpu_efficiency = mean_or_none(cpu_eff_vals)
waste_cpu=None
if cpu_efficiency is not None and walltime is not None:
waste_cpu = walltime * (100-cpu_efficiency) * first.cpus
return OutputRow(
username=username,
JobID="",
NTasks=first.reqtasks,
CPUs=first.cpus,
Nodes=first.nodes,
ReqMem=first.reqmem_gb,
AllocMem=alloc_mem,
UsedMem=used_mem,
MemPerCPU=first.mem_per_cpu_gb,
ReqWalltime=first.reqwall_hours,
Count=count,
CPU_Eff=cpu_efficiency,
Mem_Eff=memory_efficiency,
Time_Eff=mean_or_none(time_eff_vals),
jobname=jobname,
maxrss_max=max([r.maxrss_bytes for r in records if r.maxrss_bytes is not None]) / (1024**3) or None,
walltime=walltime,
walltime_max=max([r.elapsed_sec for r in records if r.elapsed_sec is not None]) / 3600,
waste_Mem=waste_mem,
waste_CPU=waste_cpu,
_cpu_eff_values=cpu_eff_vals,
_mem_eff_values=mem_eff_vals,
_time_eff_values=time_eff_vals,
)
def resolve_column_name(name: str) -> str:
"""Returns canonicalized full column name, accepts one letter column codes"""
n = name.strip()
reverse = {v.lower(): v for v in DEFAULT_COLUMNS}
reverse.update({v.lower(): v for v in NUMERIC_COLUMNS})
if n in ALIASES:
return ALIASES[n]
if n.lower() in reverse:
return reverse[n.lower()]
die(f"unknown column/alias: {name}")
return ""
def sort_rows(rows: list[OutputRow], spec: str | None) -> list[OutputRow]:
if not spec:
return rows
terms = [x.strip() for x in spec.split(",") if x.strip()]
if len(terms) > 3:
die("--sort supports at most three columns")
parsed: list[tuple[str, bool]] = []
for t in terms:
desc = t.startswith("-")
asc = t.startswith("+")
name = t[1:] if (desc or asc) else t
col = resolve_column_name(name)
parsed.append((col, desc))
sorted_rows = rows
# Stable-sort from least significant to most significant.
for col, desc in reversed(parsed):
if col in NUMERIC_COLUMNS:
sorted_rows = sorted(
sorted_rows,
key=lambda r: float("-inf") if getattr(r, col) is None else getattr(r, col),
reverse=desc,
)
else:
sorted_rows = sorted(
sorted_rows,
key=lambda r: "" if getattr(r, col) is None else str(getattr(r, col)),
reverse=desc,
)
return sorted_rows
# adds sdev columns behind their respective avg column
def columns_for_sdev(base_cols: list[str]) -> list[str]:
out: list[str] = []
for col in base_cols:
out.append(col)
if col in {"CPU_Eff", "Mem_Eff", "Time_Eff"}:
out += [f"{col}_sdev", f"{col}_max", f"{col}_min"]
return out
def format_gb_value(value: float | None) -> str:
if value is None:
return "NA"
return f"{value:.2f}G"
def format_secs_to_hours(value: float | None) -> str:
if value is None:
return "NA"
return f"{value:.2f}h"
def format_value(value: Any, column: str | None = None) -> str:
if column in ["ReqMem", "UsedMem","MemPerCPU", "MaxRSS_max", "AllocMem"]:
return format_gb_value(value)
if column in ["ReqWalltime", "Walltime", "Walltime_max"]:
return format_secs_to_hours(value)
if value is None:
return "NA"
if isinstance(value, float):
return f"{value:.2f}"
return str(value)
def print_table(rows: list[OutputRow], columns: list[str], sdev: bool) -> None:
dicts = [r.as_dict(sdev=sdev) for r in rows]
columns = columns_for_sdev(columns) if sdev else columns
widths = {col: len(col) for col in columns}
for d in dicts:
for col in columns:
widths[col] = max(widths[col], len(format_value(d.get(col), col)))
print(" ".join(col.ljust(widths[col]) for col in columns))
print(" ".join("-" * widths[col] for col in columns))
for d in dicts:
print(" ".join(format_value(d.get(col), col).ljust(widths[col]) for col in columns))
FORMAT_RE = re.compile(r"%(?:([-.0-9]*)([A-Za-z]))")
def print_custom_format(rows: list[OutputRow], fmt: str, sdev: bool) -> None:
# Slurm-like mini-format: %.12u %.8e %.30j, where the final letter is an alias.
# For efficiency aliases with --sdev, only the averaged value is emitted here.
for row in rows:
d = row.as_dict(sdev=sdev)
def repl(match: re.Match[str]) -> str:
widthspec, alias = match.groups()
col = resolve_column_name(alias)
val = d.get(col)
text = format_value(val, col)
if widthspec:
# Interpret %.12s style as max width; %12s as min width.
m = re.fullmatch(r"\.?(-?\d+)", widthspec)
if m:
w = int(m.group(1))
if widthspec.startswith("."):
text = text[: abs(w)]
else:
text = text.rjust(w) if w > 0 else text.ljust(abs(w))
return text
print(FORMAT_RE.sub(repl, fmt))
def parse_args(argv: list[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Display seff-style CPU, memory and walltime efficiency values from sacct data.",
epilog="""Examples:
# first get an overview (-U/--aggr-user) and write a cachefile
slurm-eff-tool -O sacct.cache -U
# now you can read the cachefile for later runs and e.g. sort based on waste_Mem
slurm-eff-tool -F sacct.cache -U -s=-Y
slurm-eff-tool.py -F sacct.cache --start 2026-05-01 --end 2026-05-22 -u dfeich
slurm-eff-tool.py -F sacct.cache -S 2026-05-01 -E now -u dfeich
# supports multiple sort keys
slurm-eff-tool.py -F sacct.cache --aggr-user --sdev -s cpu,-mem,time
# cluster jobs by Regexps
slurm-eff-tool.py -F sacct.cache -u dfeich -R '^vasp','^gromacs' --json
# supports slurm style formatting (will be further improved)
slurm-eff-tool.py -F sacct.cache -o "%.12u %c %N %m %.12l %C %.8e %.8M %.8t %.30j"
"""
)
p.add_argument("-S", "--start", help="sacct start time, passed to sacct -S",
default="now - 24 hours")
p.add_argument("-E", "--end", help="sacct end time, passed to sacct -E",
default="now")
p.add_argument("-u", "--user", help="restrict to one user; passed as sacct -u unless reading from cache")
p.add_argument("--state", "--job-state", dest="state",
default="COMPLETED",
help="sacct state filter, e.g. COMPLETED,FAILED,TIMEOUT")
p.add_argument("-O", "--output-cache", help="write raw sacct output cache to this file")
p.add_argument("-F", "--from-cache", help="read raw sacct output cache from this file instead of running sacct")
p.add_argument("-U", "--aggr-user", action="store_true", help="aggregate jobs by user, CPUs, nodes, ReqMem, and timelimit")
p.add_argument(
"-R",
"--aggr-regexp",
action="append",
help="aggregate jobs matching regexp by regexp, CPUs, nodes, ReqMem, and timelimit; may be repeated",
)
p.add_argument("--sdev", action="store_true", help="after each efficiency average, add sdev, max, and min columns")
p.add_argument("--json", action="store_true", help="emit JSON instead of an ASCII table")
p.add_argument("-s", "--sort", help="comma-separated numeric sort columns or aliases; prefix with - for descending")
p.add_argument(
"-o",
"--format",
help=(
"Slurm-like output format using aliases: "
"u=username,i=JobID,c=CPUs,N=Nodes,m=ReqMem,p=MemPerCPU,l=ReqWalltime,C=Count,"
"e=CPU_Eff,M=Mem_Eff,t=Time_Eff,j=jobname"
),
)
args = p.parse_args(argv)
if args.from_cache and any([args.start, args.end, args.state]):
print("warning: -S/-E/--state are ignored when using -F/--from-cache", file=sys.stderr)
if args.aggr_user and args.aggr_regexp:
die("choose only one aggregation mode: --aggr-user or --aggr-regexp")
return args
def main(argv: list[str] | None = None) -> int:
# die silently if pipe process dies before us
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
args = parse_args(argv or sys.argv[1:])
if args.from_cache:
rows = read_cache(args.from_cache)
else:
rows = run_sacct(args)
if args.output_cache:
write_cache(args.output_cache, rows)
records = build_job_records(rows, only_user=args.user)
out_rows = aggregate_records(records, args)
out_rows = sort_rows(out_rows, args.sort)
if args.aggr_user or args.aggr_regexp:
output_columns = [c for c in DEFAULT_COLUMNS if c != "JobID"]
else:
output_columns = [c for c in DEFAULT_COLUMNS if c != "Count"]
if args.json:
print(json.dumps([r.as_dict(sdev=args.sdev) for r in out_rows], indent=2))
elif args.format:
print_custom_format(out_rows, args.format, args.sdev)
else:
print_table(out_rows, output_columns, args.sdev)
return 0
if __name__ == "__main__":
raise SystemExit(main())