725 lines
24 KiB
Python
Executable File
725 lines
24 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
slurm-eff-tool.py - Slurm job efficiency reporting and investigation tool.
|
|
|
|
Examples:
|
|
./slurm_eff.py --start 2026-05-01 --end 2026-05-22 -u dfeich
|
|
./slurm_eff.py -S 2026-05-01 -E now -u dfeich -O sacct.cache
|
|
./slurm_eff.py -F sacct.cache --aggr-user --sdev -s cpu,-mem,time
|
|
./slurm_eff.py -F sacct.cache -R '^vasp','^gromacs' --json
|
|
./slurm_eff.py -F sacct.cache -o "%.12u %c %N %m %.12l %C %.8e %.8M %.8t %.30j"
|
|
|
|
Efficiency definitions:
|
|
CPU_Efficiency = TotalCPU_seconds / CPUTimeRAW * 100
|
|
Memory_Efficiency = max(MaxRSS_bytes across non-extern job steps) / requested_memory_bytes * 100
|
|
Time_Efficiency = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100
|
|
|
|
Memory efficiency intentionally uses the maximum MaxRSS seen across non-extern
|
|
Slurm job steps. This conceptually matches the practical "peak RSS" style of
|
|
seff, but it is not a perfect sum of all ranks.
|
|
|
|
This script intentionally asks sacct for raw parsable fields and caches those rows.
|
|
"""
|
|
# initial version of script produced by vibe coding of an exact functionality
|
|
# description using chatgpt-5.5
|
|
# 2026 D. Feichtinger <derek.feichtinger@psi.ch>
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
import statistics
|
|
import subprocess
|
|
import sys
|
|
import signal
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
|
|
SACCT_FIELDS = [
|
|
"JobIDRaw",
|
|
"JobID",
|
|
"User",
|
|
"JobName",
|
|
"State",
|
|
"AllocCPUS",
|
|
"NNodes",
|
|
"ReqMem",
|
|
"ElapsedRaw",
|
|
"TimelimitRaw",
|
|
"CPUTimeRAW",
|
|
"TotalCPU",
|
|
"MaxRSS",
|
|
]
|
|
|
|
DEFAULT_COLUMNS = [
|
|
"username",
|
|
"JobID",
|
|
"CPUs",
|
|
"Nodes",
|
|
"ReqMem",
|
|
"MemPerCPU",
|
|
"ReqWalltime",
|
|
"Count",
|
|
"CPU_Efficiency",
|
|
"Memory_Efficiency",
|
|
"Time_Efficiency",
|
|
"jobname",
|
|
]
|
|
|
|
# One-character aliases for sorting and output format specifications.
|
|
ALIASES = {
|
|
"u": "username",
|
|
"i": "JobID",
|
|
"c": "CPUs",
|
|
"N": "Nodes",
|
|
"m": "ReqMem",
|
|
"p": "MemPerCPU",
|
|
"l": "ReqWalltime",
|
|
"C": "Count",
|
|
"e": "CPU_Efficiency",
|
|
"M": "Memory_Efficiency",
|
|
"t": "Time_Efficiency",
|
|
"j": "jobname",
|
|
}
|
|
|
|
NUMERIC_COLUMNS = {
|
|
"CPUs",
|
|
"Nodes",
|
|
"ReqMem",
|
|
"MemPerCPU",
|
|
"ReqWalltime",
|
|
"Count",
|
|
"CPU_Efficiency",
|
|
"Memory_Efficiency",
|
|
"Time_Efficiency",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class JobRecord:
|
|
raw: dict[str, str]
|
|
username: str
|
|
jobname: str
|
|
cpus: int
|
|
nodes: int
|
|
reqmem_gb: float | None
|
|
mem_per_cpu_gb: float | None
|
|
reqwall_hours: float | None
|
|
reqmem_bytes_total: float | None
|
|
elapsed_sec: int
|
|
timelimit_sec: int
|
|
totalcpu_sec: float
|
|
maxrss_bytes: float | None
|
|
cpu_eff: float | None
|
|
mem_eff: float | None
|
|
time_eff: float | None
|
|
|
|
|
|
@dataclass
|
|
class OutputRow:
|
|
username: str
|
|
JobID: str
|
|
CPUs: int
|
|
Nodes: int
|
|
ReqMem: float | None
|
|
MemPerCPU: float | None
|
|
ReqWalltime: float | None
|
|
Count: int
|
|
CPU_Efficiency: float | None
|
|
Memory_Efficiency: float | None
|
|
Time_Efficiency: float | None
|
|
jobname: str
|
|
_cpu_eff_values: list[float] = field(default_factory=list, repr=False)
|
|
_mem_eff_values: list[float] = field(default_factory=list, repr=False)
|
|
_time_eff_values: list[float] = field(default_factory=list, repr=False)
|
|
|
|
def as_dict(self, sdev: bool = False) -> dict[str, Any]:
|
|
d: dict[str, Any] = {
|
|
"username": self.username,
|
|
"JobID": self.JobID,
|
|
"CPUs": self.CPUs,
|
|
"Nodes": self.Nodes,
|
|
"ReqMem": self.ReqMem,
|
|
"MemPerCPU": self.MemPerCPU,
|
|
"ReqWalltime": self.ReqWalltime,
|
|
"Count": self.Count,
|
|
"CPU_Efficiency": self.CPU_Efficiency,
|
|
"Memory_Efficiency": self.Memory_Efficiency,
|
|
"Time_Efficiency": self.Time_Efficiency,
|
|
"jobname": self.jobname,
|
|
}
|
|
if sdev:
|
|
for col, vals in [
|
|
("CPU_Efficiency", self._cpu_eff_values),
|
|
("Memory_Efficiency", self._mem_eff_values),
|
|
("Time_Efficiency", self._time_eff_values),
|
|
]:
|
|
d[f"{col}_sdev"] = stdev_or_none(vals)
|
|
d[f"{col}_max"] = max(vals) if vals else None
|
|
d[f"{col}_min"] = min(vals) if vals else None
|
|
return d
|
|
|
|
|
|
def die(msg: str, code: int = 2) -> None:
|
|
print(f"error: {msg}", file=sys.stderr)
|
|
raise SystemExit(code)
|
|
|
|
|
|
def slurm_duration_to_seconds(value: str) -> float:
|
|
"""Parse Slurm-ish CPU time strings such as 01:02:03, 2-01:02:03, 5:12.345."""
|
|
value = (value or "").strip()
|
|
if not value or value in {"Unknown", "UNLIMITED", "Partition_Limit"}:
|
|
return 0.0
|
|
|
|
days = 0
|
|
if "-" in value:
|
|
d, value = value.split("-", 1)
|
|
days = int(d)
|
|
|
|
parts = value.split(":")
|
|
try:
|
|
if len(parts) == 3:
|
|
h, m, s = parts
|
|
sec = float(s)
|
|
return days * 86400 + int(h) * 3600 + int(m) * 60 + sec
|
|
if len(parts) == 2:
|
|
m, s = parts
|
|
return days * 86400 + int(m) * 60 + float(s)
|
|
if len(parts) == 1:
|
|
return days * 86400 + float(parts[0])
|
|
except ValueError:
|
|
return 0.0
|
|
return 0.0
|
|
|
|
|
|
def format_seconds(seconds: int | float | None) -> str:
|
|
if seconds is None or seconds <= 0:
|
|
return "Unknown"
|
|
seconds = int(round(seconds))
|
|
days, rem = divmod(seconds, 86400)
|
|
hours, rem = divmod(rem, 3600)
|
|
minutes, sec = divmod(rem, 60)
|
|
if days:
|
|
return f"{days}-{hours:02d}:{minutes:02d}:{sec:02d}"
|
|
return f"{hours:02d}:{minutes:02d}:{sec:02d}"
|
|
|
|
|
|
def parse_size_to_bytes(value: str) -> float | None:
|
|
"""Parse Slurm memory size fields such as 1024K, 2000M, 8Gn, 4Gc, 1.5T."""
|
|
s = (value or "").strip()
|
|
if not s or s in {"Unknown", "0", "0K", "0M", "0G", "0T"}:
|
|
return None
|
|
|
|
# Slurm ReqMem may end in c/n for per-CPU or per-node. Strip that here.
|
|
if s[-1].lower() in {"c", "n"}:
|
|
s = s[:-1]
|
|
|
|
m = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)([KMGTP]?)", s, flags=re.I)
|
|
if not m:
|
|
return None
|
|
|
|
num = float(m.group(1))
|
|
unit = m.group(2).upper() or "K" # Slurm memory fields are normally KiB when unitless.
|
|
mult = {
|
|
"K": 1024,
|
|
"M": 1024**2,
|
|
"G": 1024**3,
|
|
"T": 1024**4,
|
|
"P": 1024**5,
|
|
}[unit]
|
|
return num * mult
|
|
|
|
|
|
def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None:
|
|
"""Convert ReqMem into total requested bytes for the whole job allocation."""
|
|
raw = (reqmem or "").strip()
|
|
base = parse_size_to_bytes(raw)
|
|
if base is None:
|
|
return None
|
|
if raw.lower().endswith("c"):
|
|
return base * max(cpus, 1)
|
|
# Default Slurm ReqMem suffix is usually n: memory per node.
|
|
return base * max(nodes, 1)
|
|
|
|
|
|
def common_prefix(names: list[str]) -> str:
|
|
if not names:
|
|
return ""
|
|
prefix = os.path.commonprefix(names)
|
|
# Avoid ugly partial-token prefixes when possible.
|
|
prefix = re.sub(r"[^A-Za-z0-9_.-]+$", "", prefix)
|
|
if len(names) == 1:
|
|
return names[0]
|
|
|
|
namepattern = "*"
|
|
if prefix:
|
|
namepattern = f"{prefix}*"
|
|
return namepattern
|
|
|
|
|
|
def stdev_or_none(values: list[float]) -> float | None:
|
|
if len(values) < 2:
|
|
return 0.0 if len(values) == 1 else None
|
|
return statistics.stdev(values)
|
|
|
|
|
|
def mean_or_none(values: list[float]) -> float | None:
|
|
return statistics.mean(values) if values else None
|
|
|
|
|
|
def pct(numerator: float | None, denominator: float | None) -> float | None:
|
|
if numerator is None or denominator is None or denominator <= 0:
|
|
return None
|
|
return 100.0 * numerator / denominator
|
|
|
|
|
|
def run_sacct(args: argparse.Namespace) -> list[dict[str, str]]:
|
|
cmd = [
|
|
"sacct",
|
|
"-P",
|
|
"-n",
|
|
"--units=K",
|
|
"--format=" + ",".join(SACCT_FIELDS),
|
|
]
|
|
|
|
if args.start:
|
|
cmd += ["-S", args.start]
|
|
if args.end:
|
|
cmd += ["-E", args.end]
|
|
if args.user:
|
|
cmd += ["-u", args.user]
|
|
if args.state:
|
|
cmd += ["--state", args.state]
|
|
|
|
# Include job steps because MaxRSS often lives on batch/extern/step rows.
|
|
# We later collapse rows back to the base job ID.
|
|
try:
|
|
proc = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
except FileNotFoundError:
|
|
die("sacct not found in PATH")
|
|
except subprocess.CalledProcessError as exc:
|
|
die(f"sacct failed with exit code {exc.returncode}:\n{exc.stderr.strip()}")
|
|
|
|
rows = parse_pipe_rows(proc.stdout.splitlines())
|
|
return rows
|
|
|
|
|
|
def parse_pipe_rows(lines: Iterable[str]) -> list[dict[str, str]]:
|
|
rows: list[dict[str, str]] = []
|
|
reader = csv.reader(lines, delimiter="|")
|
|
for parts in reader:
|
|
if not parts:
|
|
continue
|
|
parts = [p.strip() for p in parts]
|
|
if len(parts) < len(SACCT_FIELDS):
|
|
parts += [""] * (len(SACCT_FIELDS) - len(parts))
|
|
rows.append(dict(zip(SACCT_FIELDS, parts[: len(SACCT_FIELDS)])))
|
|
return rows
|
|
|
|
|
|
def write_cache(path: str, rows: list[dict[str, str]]) -> None:
|
|
with open(path, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=SACCT_FIELDS, delimiter="|", lineterminator="\n")
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
|
|
def read_cache(path: str) -> list[dict[str, str]]:
|
|
with open(path, newline="", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f, delimiter="|")
|
|
missing = [x for x in SACCT_FIELDS if x not in (reader.fieldnames or [])]
|
|
# Older cache files from versions before CPUTimeRAW can still be read.
|
|
# CPU efficiency will be NA for those rows unless CPUTimeRAW is present.
|
|
missing_required = [x for x in missing if x != "CPUTimeRAW"]
|
|
if missing_required:
|
|
die(f"cache file is missing expected fields: {', '.join(missing_required)}")
|
|
return [{k: (row.get(k) or "").strip() for k in SACCT_FIELDS} for row in reader]
|
|
|
|
|
|
def base_job_id(jobid: str) -> str:
|
|
return re.split(r"[._]", jobid, maxsplit=1)[0]
|
|
|
|
|
|
def is_top_level_row(row: dict[str, str]) -> bool:
|
|
jid = row.get("JobID", "")
|
|
return "." not in jid and "_" not in jid
|
|
|
|
|
|
def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) -> list[JobRecord]:
|
|
"""Collapse sacct top-level and step rows into one JobRecord per base job."""
|
|
grouped: dict[str, list[dict[str, str]]] = defaultdict(list)
|
|
for row in rows:
|
|
# Do not filter by User here. On many Slurm installations, job step rows
|
|
# are where MaxRSS is populated, but their User field may be empty or may
|
|
# not match the parent job's user. Filtering before grouping drops those
|
|
# rows and makes Memory_Efficiency become NA. Filter after identifying
|
|
# the top-level job row instead.
|
|
grouped[base_job_id(row.get("JobIDRaw") or row.get("JobID", ""))].append(row)
|
|
|
|
records: list[JobRecord] = []
|
|
for _, group in grouped.items():
|
|
top = next((r for r in group if is_top_level_row(r)), group[0])
|
|
if only_user and top.get("User") != only_user:
|
|
continue
|
|
|
|
# seff-style practical peak RSS: maximum MaxRSS across non-extern job steps.
|
|
# This is intentionally not a sum over all ranks/tasks.
|
|
step_rows = [
|
|
r for r in group
|
|
if "." in (r.get("JobID") or r.get("JobIDRaw") or "")
|
|
and ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
|
|
]
|
|
rss_source_rows = step_rows or [
|
|
r for r in group
|
|
if ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
|
|
]
|
|
maxrss_values = [parse_size_to_bytes(r.get("MaxRSS", "")) for r in rss_source_rows]
|
|
maxrss = max([x for x in maxrss_values if x is not None], default=None)
|
|
|
|
cpus = int(float(top.get("AllocCPUS") or 0))
|
|
nodes = int(float(top.get("NNodes") or 0))
|
|
elapsed = int(float(top.get("ElapsedRaw") or 0))
|
|
|
|
# sacct TimelimitRaw is minutes, while ElapsedRaw and CPUTimeRAW are seconds.
|
|
timelimit_raw_minutes = float(top.get("TimelimitRaw") or 0)
|
|
timelimit_seconds = int(round(timelimit_raw_minutes * 60))
|
|
|
|
cputime_raw = float(top.get("CPUTimeRAW") or 0)
|
|
totalcpu = slurm_duration_to_seconds(top.get("TotalCPU", ""))
|
|
|
|
reqmem = top.get("ReqMem", "")
|
|
reqmem_total = reqmem_total_bytes(reqmem, cpus, nodes)
|
|
|
|
reqmem_gb = None
|
|
if reqmem_total is not None:
|
|
reqmem_gb = reqmem_total / (1024**3)
|
|
|
|
reqwall_hours = timelimit_seconds / 3600.0 if timelimit_seconds > 0 else None
|
|
mem_per_cpu_gb = reqmem_gb / cpus if reqmem_gb is not None and cpus > 0 else None
|
|
|
|
cpu_eff = pct(totalcpu, cputime_raw)
|
|
mem_eff = pct(maxrss, reqmem_total)
|
|
time_eff = pct(elapsed, timelimit_seconds)
|
|
|
|
records.append(
|
|
JobRecord(
|
|
raw=top,
|
|
username=top.get("User", ""),
|
|
jobname=top.get("JobName", ""),
|
|
cpus=cpus,
|
|
nodes=nodes,
|
|
reqmem_gb=reqmem_gb,
|
|
mem_per_cpu_gb=mem_per_cpu_gb,
|
|
reqwall_hours=reqwall_hours,
|
|
reqmem_bytes_total=reqmem_total,
|
|
elapsed_sec=elapsed,
|
|
timelimit_sec=timelimit_seconds,
|
|
totalcpu_sec=totalcpu,
|
|
maxrss_bytes=maxrss,
|
|
cpu_eff=cpu_eff,
|
|
mem_eff=mem_eff,
|
|
time_eff=time_eff,
|
|
)
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
def aggregate_records(records: list[JobRecord], args: argparse.Namespace) -> list[OutputRow]:
|
|
"""Aggregate records according to given grouping instructions."""
|
|
if args.aggr_regexp:
|
|
compiled = [(pat, re.compile(pat)) for pat in args.aggr_regexp]
|
|
buckets: dict[tuple[Any, ...], list[JobRecord]] = defaultdict(list)
|
|
unmatched: list[JobRecord] = []
|
|
|
|
for rec in records:
|
|
matched = False
|
|
for pat, rx in compiled:
|
|
if rx.search(rec.jobname):
|
|
key = (pat, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours,
|
|
rec.username)
|
|
buckets[key].append(rec)
|
|
matched = True
|
|
break
|
|
if not matched:
|
|
unmatched.append(rec)
|
|
|
|
out = [make_aggregate_row(v, username=k[5], jobname=k[0]) for k, v in buckets.items()]
|
|
out.extend(make_single_row(r) for r in unmatched)
|
|
return out
|
|
|
|
if args.aggr_user:
|
|
buckets = defaultdict(list)
|
|
for rec in records:
|
|
key = (rec.username, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours)
|
|
buckets[key].append(rec)
|
|
return [make_aggregate_row(v, username=k[0], jobname=f"{common_prefix([r.jobname for r in v])}") for k, v in buckets.items()]
|
|
|
|
return [make_single_row(r) for r in records]
|
|
|
|
|
|
def make_single_row(rec: JobRecord) -> OutputRow:
|
|
"""Returns an OutputRow based on a single slurm job record."""
|
|
return OutputRow(
|
|
username=rec.username,
|
|
JobID=rec.raw.get("JobIDRaw", rec.raw.get("JobID", "")),
|
|
CPUs=rec.cpus,
|
|
Nodes=rec.nodes,
|
|
ReqMem=rec.reqmem_gb,
|
|
MemPerCPU=rec.mem_per_cpu_gb,
|
|
ReqWalltime=rec.reqwall_hours,
|
|
Count=1,
|
|
CPU_Efficiency=rec.cpu_eff,
|
|
Memory_Efficiency=rec.mem_eff,
|
|
Time_Efficiency=rec.time_eff,
|
|
jobname=rec.jobname,
|
|
_cpu_eff_values=[rec.cpu_eff] if rec.cpu_eff is not None else [],
|
|
_mem_eff_values=[rec.mem_eff] if rec.mem_eff is not None else [],
|
|
_time_eff_values=[rec.time_eff] if rec.time_eff is not None else [],
|
|
)
|
|
|
|
|
|
def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) -> OutputRow:
|
|
"""Returns an OutputRow based on the given list of job records."""
|
|
first = records[0]
|
|
cpu_eff_vals = [r.cpu_eff for r in records if r.cpu_eff is not None]
|
|
mem_eff_vals = [r.mem_eff for r in records if r.mem_eff is not None]
|
|
time_eff_vals = [r.time_eff for r in records if r.time_eff is not None]
|
|
return OutputRow(
|
|
username=username,
|
|
JobID="",
|
|
CPUs=first.cpus,
|
|
Nodes=first.nodes,
|
|
ReqMem=first.reqmem_gb,
|
|
MemPerCPU=first.mem_per_cpu_gb,
|
|
ReqWalltime=first.reqwall_hours,
|
|
Count=len(records),
|
|
CPU_Efficiency=mean_or_none(cpu_eff_vals),
|
|
Memory_Efficiency=mean_or_none(mem_eff_vals),
|
|
Time_Efficiency=mean_or_none(time_eff_vals),
|
|
jobname=jobname,
|
|
Testcolumn=10,
|
|
_cpu_eff_values=cpu_eff_vals,
|
|
_mem_eff_values=mem_eff_vals,
|
|
_time_eff_values=time_eff_vals,
|
|
)
|
|
|
|
def resolve_column_name(name: str) -> str:
|
|
"""Returns canonicalized full column name, accepts one letter column codes"""
|
|
n = name.strip()
|
|
reverse = {v.lower(): v for v in DEFAULT_COLUMNS}
|
|
reverse.update({v.lower(): v for v in NUMERIC_COLUMNS})
|
|
if n in ALIASES:
|
|
return ALIASES[n]
|
|
if n.lower() in reverse:
|
|
return reverse[n.lower()]
|
|
die(f"unknown column/alias: {name}")
|
|
return ""
|
|
|
|
|
|
def sort_rows(rows: list[OutputRow], spec: str | None) -> list[OutputRow]:
|
|
if not spec:
|
|
return rows
|
|
|
|
terms = [x.strip() for x in spec.split(",") if x.strip()]
|
|
if len(terms) > 3:
|
|
die("--sort supports at most three columns")
|
|
|
|
parsed: list[tuple[str, bool]] = []
|
|
for t in terms:
|
|
desc = t.startswith("-")
|
|
asc = t.startswith("+")
|
|
name = t[1:] if (desc or asc) else t
|
|
col = resolve_column_name(name)
|
|
parsed.append((col, desc))
|
|
|
|
sorted_rows = rows
|
|
# Stable-sort from least significant to most significant.
|
|
for col, desc in reversed(parsed):
|
|
if col in NUMERIC_COLUMNS:
|
|
sorted_rows = sorted(
|
|
sorted_rows,
|
|
key=lambda r: float("-inf") if getattr(r, col) is None else getattr(r, col),
|
|
reverse=desc,
|
|
)
|
|
else:
|
|
sorted_rows = sorted(
|
|
sorted_rows,
|
|
key=lambda r: "" if getattr(r, col) is None else str(getattr(r, col)),
|
|
reverse=desc,
|
|
)
|
|
return sorted_rows
|
|
|
|
|
|
# adds sdev columns behind their respective avg column
|
|
def columns_for_sdev(base_cols: list[str]) -> list[str]:
|
|
out: list[str] = []
|
|
for col in base_cols:
|
|
out.append(col)
|
|
if col in {"CPU_Efficiency", "Memory_Efficiency", "Time_Efficiency"}:
|
|
out += [f"{col}_sdev", f"{col}_max", f"{col}_min"]
|
|
return out
|
|
|
|
|
|
|
|
|
|
def format_reqmem_gb(value: float | None) -> str:
|
|
if value is None:
|
|
return "NA"
|
|
return f"{value:.2f}G"
|
|
|
|
|
|
def format_reqwall_hours(value: float | None) -> str:
|
|
if value is None:
|
|
return "NA"
|
|
return f"{value:.2f}h"
|
|
|
|
|
|
def format_value(value: Any, column: str | None = None) -> str:
|
|
if column == "ReqMem":
|
|
return format_reqmem_gb(value)
|
|
if column == "MemPerCPU":
|
|
return format_reqmem_gb(value)
|
|
if column == "ReqWalltime":
|
|
return format_reqwall_hours(value)
|
|
|
|
if value is None:
|
|
return "NA"
|
|
if isinstance(value, float):
|
|
return f"{value:.2f}"
|
|
return str(value)
|
|
|
|
|
|
def print_table(rows: list[OutputRow], columns: list[str], sdev: bool) -> None:
|
|
dicts = [r.as_dict(sdev=sdev) for r in rows]
|
|
columns = columns_for_sdev(columns) if sdev else columns
|
|
|
|
widths = {col: len(col) for col in columns}
|
|
for d in dicts:
|
|
for col in columns:
|
|
widths[col] = max(widths[col], len(format_value(d.get(col), col)))
|
|
|
|
print(" ".join(col.ljust(widths[col]) for col in columns))
|
|
print(" ".join("-" * widths[col] for col in columns))
|
|
for d in dicts:
|
|
print(" ".join(format_value(d.get(col), col).ljust(widths[col]) for col in columns))
|
|
|
|
|
|
FORMAT_RE = re.compile(r"%(?:([-.0-9]*)([A-Za-z]))")
|
|
|
|
|
|
def print_custom_format(rows: list[OutputRow], fmt: str, sdev: bool) -> None:
|
|
# Slurm-like mini-format: %.12u %.8e %.30j, where the final letter is an alias.
|
|
# For efficiency aliases with --sdev, only the averaged value is emitted here.
|
|
for row in rows:
|
|
d = row.as_dict(sdev=sdev)
|
|
|
|
def repl(match: re.Match[str]) -> str:
|
|
widthspec, alias = match.groups()
|
|
col = resolve_column_name(alias)
|
|
val = d.get(col)
|
|
text = format_value(val, col)
|
|
if widthspec:
|
|
# Interpret %.12s style as max width; %12s as min width.
|
|
m = re.fullmatch(r"\.?(-?\d+)", widthspec)
|
|
if m:
|
|
w = int(m.group(1))
|
|
if widthspec.startswith("."):
|
|
text = text[: abs(w)]
|
|
else:
|
|
text = text.rjust(w) if w > 0 else text.ljust(abs(w))
|
|
return text
|
|
|
|
print(FORMAT_RE.sub(repl, fmt))
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
description="Display seff-style CPU, memory and walltime efficiency values from sacct data."
|
|
)
|
|
|
|
p.add_argument("-S", "--start", help="sacct start time, passed to sacct -S",
|
|
default="now - 24 hours")
|
|
p.add_argument("-E", "--end", help="sacct end time, passed to sacct -E",
|
|
default="now")
|
|
p.add_argument("-u", "--user", help="restrict to one user; passed as sacct -u unless reading from cache")
|
|
p.add_argument("--state", "--job-state", dest="state",
|
|
default="COMPLETED",
|
|
help="sacct state filter, e.g. COMPLETED,FAILED,TIMEOUT")
|
|
|
|
p.add_argument("-O", "--output-cache", help="write raw sacct output cache to this file")
|
|
p.add_argument("-F", "--from-cache", help="read raw sacct output cache from this file instead of running sacct")
|
|
|
|
p.add_argument("-U", "--aggr-user", action="store_true", help="aggregate jobs by user, CPUs, nodes, ReqMem, and timelimit")
|
|
p.add_argument(
|
|
"-R",
|
|
"--aggr-regexp",
|
|
action="append",
|
|
help="aggregate jobs matching regexp by regexp, CPUs, nodes, ReqMem, and timelimit; may be repeated",
|
|
)
|
|
|
|
p.add_argument("--sdev", action="store_true", help="after each efficiency average, add sdev, max, and min columns")
|
|
p.add_argument("--json", action="store_true", help="emit JSON instead of an ASCII table")
|
|
p.add_argument("-s", "--sort", help="comma-separated numeric sort columns or aliases; prefix with - for descending")
|
|
p.add_argument(
|
|
"-o",
|
|
"--format",
|
|
help=(
|
|
"Slurm-like output format using aliases: "
|
|
"u=username,i=JobID,c=CPUs,N=Nodes,m=ReqMem,p=MemPerCPU,l=ReqWalltime,C=Count,"
|
|
"e=CPU_Efficiency,M=Memory_Efficiency,t=Time_Efficiency,j=jobname"
|
|
),
|
|
)
|
|
|
|
args = p.parse_args(argv)
|
|
|
|
if args.from_cache and any([args.start, args.end, args.state]):
|
|
print("warning: -S/-E/--state are ignored when using -F/--from-cache", file=sys.stderr)
|
|
if args.aggr_user and args.aggr_regexp:
|
|
die("choose only one aggregation mode: --aggr-user or --aggr-regexp")
|
|
return args
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
|
|
# die silently if pipe process dies before us
|
|
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
|
|
|
args = parse_args(argv or sys.argv[1:])
|
|
|
|
if args.from_cache:
|
|
rows = read_cache(args.from_cache)
|
|
else:
|
|
rows = run_sacct(args)
|
|
if args.output_cache:
|
|
write_cache(args.output_cache, rows)
|
|
|
|
records = build_job_records(rows, only_user=args.user)
|
|
out_rows = aggregate_records(records, args)
|
|
out_rows = sort_rows(out_rows, args.sort)
|
|
|
|
if args.aggr_user or args.aggr_regexp:
|
|
output_columns = [c for c in DEFAULT_COLUMNS if c != "JobID"]
|
|
else:
|
|
output_columns = DEFAULT_COLUMNS
|
|
|
|
if args.json:
|
|
print(json.dumps([r.as_dict(sdev=args.sdev) for r in out_rows], indent=2))
|
|
elif args.format:
|
|
print_custom_format(out_rows, args.format, args.sdev)
|
|
else:
|
|
print_table(out_rows, output_columns, args.sdev)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|