initial version of slurm-eff-tool script
produced by vibe coding of an exact functionality description using chatgpt-5.5
This commit is contained in:
Executable
+652
@@ -0,0 +1,652 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
slurm-eff-tool.py - Slurm job efficiency reporting and investigation tool.
|
||||
|
||||
Examples:
|
||||
./slurm_eff.py --start 2026-05-01 --end 2026-05-22 -u dfeich
|
||||
./slurm_eff.py -S 2026-05-01 -E now -u dfeich -O sacct.cache
|
||||
./slurm_eff.py -F sacct.cache --aggr-user --sdev -s cpu,-mem,time
|
||||
./slurm_eff.py -F sacct.cache -R '^vasp','^gromacs' --json
|
||||
./slurm_eff.py -F sacct.cache -o "%.12u %c %N %m %.12l %C %.8e %.8M %.8t %.30j"
|
||||
|
||||
Efficiency definitions:
|
||||
CPU_Efficiency = TotalCPU_seconds / CPUTimeRAW * 100
|
||||
Memory_Efficiency = max(MaxRSS_bytes across non-extern job steps) / requested_memory_bytes * 100
|
||||
Time_Efficiency = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100
|
||||
|
||||
Memory efficiency intentionally uses the maximum MaxRSS seen across non-extern
|
||||
Slurm job steps. This conceptually matches the practical "peak RSS" style of
|
||||
seff, but it is not a perfect sum of all ranks.
|
||||
|
||||
This script intentionally asks sacct for raw parsable fields and caches those rows.
|
||||
"""
|
||||
# initial version of script produced by vibe coding of an exact functionality
|
||||
# description using chatgpt-5.5
|
||||
# 2026 D. Feichtinger <derek.feichtinger@psi.ch>
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import statistics
|
||||
import subprocess
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable
|
||||
|
||||
|
||||
SACCT_FIELDS = [
|
||||
"JobIDRaw",
|
||||
"JobID",
|
||||
"User",
|
||||
"JobName",
|
||||
"State",
|
||||
"AllocCPUS",
|
||||
"NNodes",
|
||||
"ReqMem",
|
||||
"ElapsedRaw",
|
||||
"TimelimitRaw",
|
||||
"CPUTimeRAW",
|
||||
"TotalCPU",
|
||||
"MaxRSS",
|
||||
]
|
||||
|
||||
DEFAULT_COLUMNS = [
|
||||
"username",
|
||||
"CPUs",
|
||||
"Nodes",
|
||||
"ReqMem",
|
||||
"ReqWalltime",
|
||||
"Count",
|
||||
"CPU_Efficiency",
|
||||
"Memory_Efficiency",
|
||||
"Time_Efficiency",
|
||||
"jobname",
|
||||
]
|
||||
|
||||
# One-character aliases for sorting and output format specifications.
|
||||
ALIASES = {
|
||||
"u": "username",
|
||||
"c": "CPUs",
|
||||
"N": "Nodes",
|
||||
"m": "ReqMem",
|
||||
"l": "ReqWalltime",
|
||||
"C": "Count",
|
||||
"e": "CPU_Efficiency",
|
||||
"M": "Memory_Efficiency",
|
||||
"t": "Time_Efficiency",
|
||||
"j": "jobname",
|
||||
}
|
||||
|
||||
NUMERIC_COLUMNS = {
|
||||
"CPUs",
|
||||
"Nodes",
|
||||
"Count",
|
||||
"CPU_Efficiency",
|
||||
"Memory_Efficiency",
|
||||
"Time_Efficiency",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class JobRecord:
|
||||
raw: dict[str, str]
|
||||
username: str
|
||||
jobname: str
|
||||
cpus: int
|
||||
nodes: int
|
||||
reqmem_raw: str
|
||||
reqwall_raw: str
|
||||
reqmem_bytes_total: float | None
|
||||
elapsed_sec: int
|
||||
timelimit_sec: int
|
||||
totalcpu_sec: float
|
||||
maxrss_bytes: float | None
|
||||
cpu_eff: float | None
|
||||
mem_eff: float | None
|
||||
time_eff: float | None
|
||||
|
||||
|
||||
@dataclass
|
||||
class OutputRow:
|
||||
username: str
|
||||
CPUs: int
|
||||
Nodes: int
|
||||
ReqMem: str
|
||||
ReqWalltime: str
|
||||
Count: int
|
||||
CPU_Efficiency: float | None
|
||||
Memory_Efficiency: float | None
|
||||
Time_Efficiency: float | None
|
||||
jobname: str
|
||||
_cpu_values: list[float] = field(default_factory=list, repr=False)
|
||||
_mem_values: list[float] = field(default_factory=list, repr=False)
|
||||
_time_values: list[float] = field(default_factory=list, repr=False)
|
||||
|
||||
def as_dict(self, sdev: bool = False) -> dict[str, Any]:
|
||||
d: dict[str, Any] = {
|
||||
"username": self.username,
|
||||
"CPUs": self.CPUs,
|
||||
"Nodes": self.Nodes,
|
||||
"ReqMem": self.ReqMem,
|
||||
"ReqWalltime": self.ReqWalltime,
|
||||
"Count": self.Count,
|
||||
"CPU_Efficiency": self.CPU_Efficiency,
|
||||
"Memory_Efficiency": self.Memory_Efficiency,
|
||||
"Time_Efficiency": self.Time_Efficiency,
|
||||
"jobname": self.jobname,
|
||||
}
|
||||
if sdev:
|
||||
for col, vals in [
|
||||
("CPU_Efficiency", self._cpu_values),
|
||||
("Memory_Efficiency", self._mem_values),
|
||||
("Time_Efficiency", self._time_values),
|
||||
]:
|
||||
d[f"{col}_sdev"] = stdev_or_none(vals)
|
||||
d[f"{col}_max"] = max(vals) if vals else None
|
||||
d[f"{col}_min"] = min(vals) if vals else None
|
||||
return d
|
||||
|
||||
|
||||
def die(msg: str, code: int = 2) -> None:
|
||||
print(f"error: {msg}", file=sys.stderr)
|
||||
raise SystemExit(code)
|
||||
|
||||
|
||||
def parse_duration_to_seconds(value: str) -> float:
|
||||
"""Parse Slurm-ish CPU time strings such as 01:02:03, 2-01:02:03, 5:12.345."""
|
||||
value = (value or "").strip()
|
||||
if not value or value in {"Unknown", "UNLIMITED", "Partition_Limit"}:
|
||||
return 0.0
|
||||
|
||||
days = 0
|
||||
if "-" in value:
|
||||
d, value = value.split("-", 1)
|
||||
days = int(d)
|
||||
|
||||
parts = value.split(":")
|
||||
try:
|
||||
if len(parts) == 3:
|
||||
h, m, s = parts
|
||||
sec = float(s)
|
||||
return days * 86400 + int(h) * 3600 + int(m) * 60 + sec
|
||||
if len(parts) == 2:
|
||||
m, s = parts
|
||||
return days * 86400 + int(m) * 60 + float(s)
|
||||
if len(parts) == 1:
|
||||
return days * 86400 + float(parts[0])
|
||||
except ValueError:
|
||||
return 0.0
|
||||
return 0.0
|
||||
|
||||
|
||||
def format_seconds(seconds: int | float | None) -> str:
|
||||
if seconds is None or seconds <= 0:
|
||||
return "Unknown"
|
||||
seconds = int(round(seconds))
|
||||
days, rem = divmod(seconds, 86400)
|
||||
hours, rem = divmod(rem, 3600)
|
||||
minutes, sec = divmod(rem, 60)
|
||||
if days:
|
||||
return f"{days}-{hours:02d}:{minutes:02d}:{sec:02d}"
|
||||
return f"{hours:02d}:{minutes:02d}:{sec:02d}"
|
||||
|
||||
|
||||
def parse_size_to_bytes(value: str) -> float | None:
|
||||
"""Parse Slurm memory size fields such as 1024K, 2000M, 8Gn, 4Gc, 1.5T."""
|
||||
s = (value or "").strip()
|
||||
if not s or s in {"Unknown", "0", "0K", "0M", "0G", "0T"}:
|
||||
return None
|
||||
|
||||
# Slurm ReqMem may end in c/n for per-CPU or per-node. Strip that here.
|
||||
if s[-1].lower() in {"c", "n"}:
|
||||
s = s[:-1]
|
||||
|
||||
m = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)([KMGTP]?)", s, flags=re.I)
|
||||
if not m:
|
||||
return None
|
||||
|
||||
num = float(m.group(1))
|
||||
unit = m.group(2).upper() or "K" # Slurm memory fields are normally KiB when unitless.
|
||||
mult = {
|
||||
"K": 1024,
|
||||
"M": 1024**2,
|
||||
"G": 1024**3,
|
||||
"T": 1024**4,
|
||||
"P": 1024**5,
|
||||
}[unit]
|
||||
return num * mult
|
||||
|
||||
|
||||
def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None:
|
||||
"""Convert ReqMem into total requested bytes for the whole job allocation."""
|
||||
raw = (reqmem or "").strip()
|
||||
base = parse_size_to_bytes(raw)
|
||||
if base is None:
|
||||
return None
|
||||
if raw.lower().endswith("c"):
|
||||
return base * max(cpus, 1)
|
||||
# Default Slurm ReqMem suffix is usually n: memory per node.
|
||||
return base * max(nodes, 1)
|
||||
|
||||
|
||||
def common_prefix(names: list[str]) -> str:
|
||||
if not names:
|
||||
return ""
|
||||
prefix = os.path.commonprefix(names)
|
||||
# Avoid ugly partial-token prefixes when possible.
|
||||
prefix = re.sub(r"[^A-Za-z0-9_.-]+$", "", prefix)
|
||||
return prefix or names[0]
|
||||
|
||||
|
||||
def stdev_or_none(values: list[float]) -> float | None:
|
||||
if len(values) < 2:
|
||||
return 0.0 if len(values) == 1 else None
|
||||
return statistics.stdev(values)
|
||||
|
||||
|
||||
def mean_or_none(values: list[float]) -> float | None:
|
||||
return statistics.mean(values) if values else None
|
||||
|
||||
|
||||
def pct(numerator: float | None, denominator: float | None) -> float | None:
|
||||
if numerator is None or denominator is None or denominator <= 0:
|
||||
return None
|
||||
return 100.0 * numerator / denominator
|
||||
|
||||
|
||||
def run_sacct(args: argparse.Namespace) -> list[dict[str, str]]:
|
||||
cmd = [
|
||||
"sacct",
|
||||
"-P",
|
||||
"-n",
|
||||
"--units=K",
|
||||
"--format=" + ",".join(SACCT_FIELDS),
|
||||
]
|
||||
|
||||
if args.start:
|
||||
cmd += ["-S", args.start]
|
||||
if args.end:
|
||||
cmd += ["-E", args.end]
|
||||
if args.user:
|
||||
cmd += ["-u", args.user]
|
||||
if args.state:
|
||||
cmd += ["--state", args.state]
|
||||
|
||||
# Include job steps because MaxRSS often lives on batch/extern/step rows.
|
||||
# We later collapse rows back to the base job ID.
|
||||
try:
|
||||
proc = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
except FileNotFoundError:
|
||||
die("sacct not found in PATH")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
die(f"sacct failed with exit code {exc.returncode}:\n{exc.stderr.strip()}")
|
||||
|
||||
rows = parse_pipe_rows(proc.stdout.splitlines())
|
||||
return rows
|
||||
|
||||
|
||||
def parse_pipe_rows(lines: Iterable[str]) -> list[dict[str, str]]:
|
||||
rows: list[dict[str, str]] = []
|
||||
reader = csv.reader(lines, delimiter="|")
|
||||
for parts in reader:
|
||||
if not parts:
|
||||
continue
|
||||
parts = [p.strip() for p in parts]
|
||||
if len(parts) < len(SACCT_FIELDS):
|
||||
parts += [""] * (len(SACCT_FIELDS) - len(parts))
|
||||
rows.append(dict(zip(SACCT_FIELDS, parts[: len(SACCT_FIELDS)])))
|
||||
return rows
|
||||
|
||||
|
||||
def write_cache(path: str, rows: list[dict[str, str]]) -> None:
|
||||
with open(path, "w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.DictWriter(f, fieldnames=SACCT_FIELDS, delimiter="|", lineterminator="\n")
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
|
||||
|
||||
def read_cache(path: str) -> list[dict[str, str]]:
|
||||
with open(path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f, delimiter="|")
|
||||
missing = [x for x in SACCT_FIELDS if x not in (reader.fieldnames or [])]
|
||||
# Older cache files from versions before CPUTimeRAW can still be read.
|
||||
# CPU efficiency will be NA for those rows unless CPUTimeRAW is present.
|
||||
missing_required = [x for x in missing if x != "CPUTimeRAW"]
|
||||
if missing_required:
|
||||
die(f"cache file is missing expected fields: {', '.join(missing_required)}")
|
||||
return [{k: (row.get(k) or "").strip() for k in SACCT_FIELDS} for row in reader]
|
||||
|
||||
|
||||
def base_job_id(jobid: str) -> str:
|
||||
return re.split(r"[._]", jobid, maxsplit=1)[0]
|
||||
|
||||
|
||||
def is_top_level_row(row: dict[str, str]) -> bool:
|
||||
jid = row.get("JobID", "")
|
||||
return "." not in jid and "_" not in jid
|
||||
|
||||
|
||||
def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) -> list[JobRecord]:
|
||||
"""Collapse sacct top-level and step rows into one JobRecord per base job."""
|
||||
grouped: dict[str, list[dict[str, str]]] = defaultdict(list)
|
||||
for row in rows:
|
||||
# Do not filter by User here. On many Slurm installations, job step rows
|
||||
# are where MaxRSS is populated, but their User field may be empty or may
|
||||
# not match the parent job's user. Filtering before grouping drops those
|
||||
# rows and makes Memory_Efficiency become NA. Filter after identifying
|
||||
# the top-level job row instead.
|
||||
grouped[base_job_id(row.get("JobIDRaw") or row.get("JobID", ""))].append(row)
|
||||
|
||||
records: list[JobRecord] = []
|
||||
for _, group in grouped.items():
|
||||
top = next((r for r in group if is_top_level_row(r)), group[0])
|
||||
if only_user and top.get("User") != only_user:
|
||||
continue
|
||||
|
||||
# seff-style practical peak RSS: maximum MaxRSS across non-extern job steps.
|
||||
# This is intentionally not a sum over all ranks/tasks.
|
||||
step_rows = [
|
||||
r for r in group
|
||||
if "." in (r.get("JobID") or r.get("JobIDRaw") or "")
|
||||
and ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
|
||||
]
|
||||
rss_source_rows = step_rows or [
|
||||
r for r in group
|
||||
if ".extern" not in (r.get("JobID") or r.get("JobIDRaw") or "")
|
||||
]
|
||||
maxrss_values = [parse_size_to_bytes(r.get("MaxRSS", "")) for r in rss_source_rows]
|
||||
maxrss = max([x for x in maxrss_values if x is not None], default=None)
|
||||
|
||||
cpus = int(float(top.get("AllocCPUS") or 0))
|
||||
nodes = int(float(top.get("NNodes") or 0))
|
||||
elapsed = int(float(top.get("ElapsedRaw") or 0))
|
||||
|
||||
# sacct TimelimitRaw is minutes, while ElapsedRaw and CPUTimeRAW are seconds.
|
||||
timelimit_raw_minutes = float(top.get("TimelimitRaw") or 0)
|
||||
timelimit_seconds = int(round(timelimit_raw_minutes * 60))
|
||||
|
||||
cputime_raw = float(top.get("CPUTimeRAW") or 0)
|
||||
totalcpu = parse_duration_to_seconds(top.get("TotalCPU", ""))
|
||||
|
||||
reqmem = top.get("ReqMem", "")
|
||||
reqmem_total = reqmem_total_bytes(reqmem, cpus, nodes)
|
||||
|
||||
cpu_eff = pct(totalcpu, cputime_raw)
|
||||
mem_eff = pct(maxrss, reqmem_total)
|
||||
time_eff = pct(elapsed, timelimit_seconds)
|
||||
|
||||
records.append(
|
||||
JobRecord(
|
||||
raw=top,
|
||||
username=top.get("User", ""),
|
||||
jobname=top.get("JobName", ""),
|
||||
cpus=cpus,
|
||||
nodes=nodes,
|
||||
reqmem_raw=reqmem,
|
||||
reqwall_raw=format_seconds(timelimit_seconds),
|
||||
reqmem_bytes_total=reqmem_total,
|
||||
elapsed_sec=elapsed,
|
||||
timelimit_sec=timelimit_seconds,
|
||||
totalcpu_sec=totalcpu,
|
||||
maxrss_bytes=maxrss,
|
||||
cpu_eff=cpu_eff,
|
||||
mem_eff=mem_eff,
|
||||
time_eff=time_eff,
|
||||
)
|
||||
)
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def aggregate_records(records: list[JobRecord], args: argparse.Namespace) -> list[OutputRow]:
|
||||
if args.aggr_regexp:
|
||||
compiled = [(pat, re.compile(pat)) for pat in args.aggr_regexp]
|
||||
buckets: dict[tuple[Any, ...], list[JobRecord]] = defaultdict(list)
|
||||
unmatched: list[JobRecord] = []
|
||||
|
||||
for rec in records:
|
||||
matched = False
|
||||
for pat, rx in compiled:
|
||||
if rx.search(rec.jobname):
|
||||
key = (pat, rec.cpus, rec.nodes, rec.reqmem_raw, rec.reqwall_raw)
|
||||
buckets[key].append(rec)
|
||||
matched = True
|
||||
break
|
||||
if not matched:
|
||||
unmatched.append(rec)
|
||||
|
||||
out = [make_aggregate_row(v, username="*", jobname=k[0]) for k, v in buckets.items()]
|
||||
out.extend(make_single_row(r) for r in unmatched)
|
||||
return out
|
||||
|
||||
if args.aggr_user:
|
||||
buckets = defaultdict(list)
|
||||
for rec in records:
|
||||
key = (rec.username, rec.cpus, rec.nodes, rec.reqmem_raw, rec.reqwall_raw)
|
||||
buckets[key].append(rec)
|
||||
return [make_aggregate_row(v, username=k[0], jobname=f"{common_prefix([r.jobname for r in v])}*") for k, v in buckets.items()]
|
||||
|
||||
return [make_single_row(r) for r in records]
|
||||
|
||||
|
||||
def make_single_row(rec: JobRecord) -> OutputRow:
|
||||
return OutputRow(
|
||||
username=rec.username,
|
||||
CPUs=rec.cpus,
|
||||
Nodes=rec.nodes,
|
||||
ReqMem=rec.reqmem_raw,
|
||||
ReqWalltime=rec.reqwall_raw,
|
||||
Count=1,
|
||||
CPU_Efficiency=rec.cpu_eff,
|
||||
Memory_Efficiency=rec.mem_eff,
|
||||
Time_Efficiency=rec.time_eff,
|
||||
jobname=rec.jobname,
|
||||
_cpu_values=[rec.cpu_eff] if rec.cpu_eff is not None else [],
|
||||
_mem_values=[rec.mem_eff] if rec.mem_eff is not None else [],
|
||||
_time_values=[rec.time_eff] if rec.time_eff is not None else [],
|
||||
)
|
||||
|
||||
|
||||
def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) -> OutputRow:
|
||||
first = records[0]
|
||||
cpu_vals = [r.cpu_eff for r in records if r.cpu_eff is not None]
|
||||
mem_vals = [r.mem_eff for r in records if r.mem_eff is not None]
|
||||
time_vals = [r.time_eff for r in records if r.time_eff is not None]
|
||||
return OutputRow(
|
||||
username=username,
|
||||
CPUs=first.cpus,
|
||||
Nodes=first.nodes,
|
||||
ReqMem=first.reqmem_raw,
|
||||
ReqWalltime=first.reqwall_raw,
|
||||
Count=len(records),
|
||||
CPU_Efficiency=mean_or_none(cpu_vals),
|
||||
Memory_Efficiency=mean_or_none(mem_vals),
|
||||
Time_Efficiency=mean_or_none(time_vals),
|
||||
jobname=jobname,
|
||||
_cpu_values=cpu_vals,
|
||||
_mem_values=mem_vals,
|
||||
_time_values=time_vals,
|
||||
)
|
||||
|
||||
|
||||
def resolve_column(name: str) -> str:
|
||||
n = name.strip()
|
||||
reverse = {v.lower(): v for v in DEFAULT_COLUMNS}
|
||||
reverse.update({v.lower(): v for v in NUMERIC_COLUMNS})
|
||||
if n in ALIASES:
|
||||
return ALIASES[n]
|
||||
if n.lower() in reverse:
|
||||
return reverse[n.lower()]
|
||||
die(f"unknown column/alias: {name}")
|
||||
return ""
|
||||
|
||||
|
||||
def sort_rows(rows: list[OutputRow], spec: str | None) -> list[OutputRow]:
|
||||
if not spec:
|
||||
return rows
|
||||
|
||||
terms = [x.strip() for x in spec.split(",") if x.strip()]
|
||||
if len(terms) > 3:
|
||||
die("--sort supports at most three columns")
|
||||
|
||||
parsed: list[tuple[str, bool]] = []
|
||||
for t in terms:
|
||||
desc = t.startswith("-")
|
||||
asc = t.startswith("+")
|
||||
name = t[1:] if (desc or asc) else t
|
||||
col = resolve_column(name)
|
||||
if col not in NUMERIC_COLUMNS:
|
||||
die(f"--sort column must be numeric, got {col}")
|
||||
parsed.append((col, desc))
|
||||
|
||||
sorted_rows = rows
|
||||
# Stable-sort from least significant to most significant.
|
||||
for col, desc in reversed(parsed):
|
||||
sorted_rows = sorted(
|
||||
sorted_rows,
|
||||
key=lambda r: float("-inf") if getattr(r, col) is None else getattr(r, col),
|
||||
reverse=desc,
|
||||
)
|
||||
return sorted_rows
|
||||
|
||||
|
||||
def columns_for_sdev(base_cols: list[str]) -> list[str]:
|
||||
out: list[str] = []
|
||||
for col in base_cols:
|
||||
out.append(col)
|
||||
if col in {"CPU_Efficiency", "Memory_Efficiency", "Time_Efficiency"}:
|
||||
out += [f"{col}_sdev", f"{col}_max", f"{col}_min"]
|
||||
return out
|
||||
|
||||
|
||||
def format_value(value: Any) -> str:
|
||||
if value is None:
|
||||
return "NA"
|
||||
if isinstance(value, float):
|
||||
return f"{value:.2f}"
|
||||
return str(value)
|
||||
|
||||
|
||||
def print_table(rows: list[OutputRow], columns: list[str], sdev: bool) -> None:
|
||||
dicts = [r.as_dict(sdev=sdev) for r in rows]
|
||||
columns = columns_for_sdev(columns) if sdev else columns
|
||||
|
||||
widths = {col: len(col) for col in columns}
|
||||
for d in dicts:
|
||||
for col in columns:
|
||||
widths[col] = max(widths[col], len(format_value(d.get(col))))
|
||||
|
||||
print(" ".join(col.ljust(widths[col]) for col in columns))
|
||||
print(" ".join("-" * widths[col] for col in columns))
|
||||
for d in dicts:
|
||||
print(" ".join(format_value(d.get(col)).ljust(widths[col]) for col in columns))
|
||||
|
||||
|
||||
FORMAT_RE = re.compile(r"%(?:([-.0-9]*)([A-Za-z]))")
|
||||
|
||||
|
||||
def print_custom_format(rows: list[OutputRow], fmt: str, sdev: bool) -> None:
|
||||
# Slurm-like mini-format: %.12u %.8e %.30j, where the final letter is an alias.
|
||||
# For efficiency aliases with --sdev, only the averaged value is emitted here.
|
||||
for row in rows:
|
||||
d = row.as_dict(sdev=sdev)
|
||||
|
||||
def repl(match: re.Match[str]) -> str:
|
||||
widthspec, alias = match.groups()
|
||||
col = resolve_column(alias)
|
||||
val = d.get(col)
|
||||
text = format_value(val)
|
||||
if widthspec:
|
||||
# Interpret %.12s style as max width; %12s as min width.
|
||||
m = re.fullmatch(r"\.?(-?\d+)", widthspec)
|
||||
if m:
|
||||
w = int(m.group(1))
|
||||
if widthspec.startswith("."):
|
||||
text = text[: abs(w)]
|
||||
else:
|
||||
text = text.rjust(w) if w > 0 else text.ljust(abs(w))
|
||||
return text
|
||||
|
||||
print(FORMAT_RE.sub(repl, fmt))
|
||||
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Display seff-style CPU, memory and walltime efficiency values from sacct data."
|
||||
)
|
||||
|
||||
p.add_argument("-S", "--start", help="sacct start time, passed to sacct -S",
|
||||
default="now - 24 hours")
|
||||
p.add_argument("-E", "--end", help="sacct end time, passed to sacct -E",
|
||||
default="now")
|
||||
p.add_argument("-u", "--user", help="restrict to one user; passed as sacct -u unless reading from cache")
|
||||
p.add_argument("--state", "--job-state", dest="state",
|
||||
default="COMPLETED",
|
||||
help="sacct state filter, e.g. COMPLETED,FAILED,TIMEOUT")
|
||||
|
||||
p.add_argument("-O", "--output-cache", help="write raw sacct output cache to this file")
|
||||
p.add_argument("-F", "--from-cache", help="read raw sacct output cache from this file instead of running sacct")
|
||||
|
||||
p.add_argument("-U", "--aggr-user", action="store_true", help="aggregate jobs by user, CPUs, nodes, ReqMem, and timelimit")
|
||||
p.add_argument(
|
||||
"-R",
|
||||
"--aggr-regexp",
|
||||
action="append",
|
||||
help="aggregate jobs matching regexp by regexp, CPUs, nodes, ReqMem, and timelimit; may be repeated",
|
||||
)
|
||||
|
||||
p.add_argument("--sdev", action="store_true", help="after each efficiency average, add sdev, max, and min columns")
|
||||
p.add_argument("--json", action="store_true", help="emit JSON instead of an ASCII table")
|
||||
p.add_argument("-s", "--sort", help="comma-separated numeric sort columns or aliases; prefix with - for descending")
|
||||
p.add_argument(
|
||||
"-o",
|
||||
"--format",
|
||||
help=(
|
||||
"Slurm-like output format using aliases: "
|
||||
"u=username,c=CPUs,N=Nodes,m=ReqMem,l=ReqWalltime,C=Count,"
|
||||
"e=CPU_Efficiency,M=Memory_Efficiency,t=Time_Efficiency,j=jobname"
|
||||
),
|
||||
)
|
||||
|
||||
args = p.parse_args(argv)
|
||||
|
||||
if args.from_cache and any([args.start, args.end, args.state]):
|
||||
print("warning: -S/-E/--state are ignored when using -F/--from-cache", file=sys.stderr)
|
||||
if args.aggr_user and args.aggr_regexp:
|
||||
die("choose only one aggregation mode: --aggr-user or --aggr-regexp")
|
||||
return args
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(argv or sys.argv[1:])
|
||||
|
||||
if args.from_cache:
|
||||
rows = read_cache(args.from_cache)
|
||||
else:
|
||||
rows = run_sacct(args)
|
||||
if args.output_cache:
|
||||
write_cache(args.output_cache, rows)
|
||||
|
||||
records = build_job_records(rows, only_user=args.user)
|
||||
out_rows = aggregate_records(records, args)
|
||||
out_rows = sort_rows(out_rows, args.sort)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps([r.as_dict(sdev=args.sdev) for r in out_rows], indent=2))
|
||||
elif args.format:
|
||||
print_custom_format(out_rows, args.format, args.sdev)
|
||||
else:
|
||||
print_table(out_rows, DEFAULT_COLUMNS, args.sdev)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user