TRES parsing for getting Req, Alloc, Used TRES

Introduced elegant dataclasses, but could have come at the cost
of speed.
This commit is contained in:
2026-06-05 17:18:29 +02:00
parent d544cde7d9
commit 362e4d46c9
+126 -78
View File
@@ -3,10 +3,10 @@
slurm-eff-tool.py - Slurm job efficiency reporting and investigation tool.
Efficiency definitions:
CPU_Efficiency = TotalCPU_seconds / CPUTimeRAW * 100
Time_Efficiency = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100
CPU_Eff = TotalCPU_seconds / CPUTimeRAW * 100
Time_Eff = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100
Memory_Efficiency = 100 (sum of peaks over all job steps) / (mem allocated by system)
Mem_Eff = 100 (slurm recorded mem usage) / (mem allocated by system)
This script intentionally asks sacct for raw parsable fields and caches those rows.
TODO: allow to cache a faster already parsed and binary format. Keep sacct text option for
@@ -30,7 +30,7 @@ import subprocess
import sys
import signal
from collections import defaultdict
from dataclasses import dataclass, field
from dataclasses import dataclass, field, fields
from pathlib import Path
from typing import Any, Iterable
@@ -49,6 +49,7 @@ SACCT_FIELDS = [
"CPUTimeRAW",
"TotalCPU",
"MaxRSS",
"ReqTRes",
"AllocTRES",
"TRESUsageInTot",
]
@@ -58,21 +59,22 @@ DEFAULT_COLUMNS = [
"username",
"JobID",
"Count",
"NTasks",
"CPUs",
"Nodes",
"CPU_Efficiency",
"CPU_Eff",
"waste_CPU",
"ReqMem",
"UsedMem",
"AllocMem",
"MemPerCPU",
"MaxRSS_max",
"Memory_Efficiency",
"UsedMem",
"Mem_Eff",
"waste_Mem",
"MaxRSS_max",
"MemPerCPU",
"ReqWalltime",
"Walltime",
"Walltime_max",
"Time_Efficiency",
"Time_Eff",
"jobname",
]
@@ -87,9 +89,9 @@ ALIASES = {
"p": "MemPerCPU",
"l": "ReqWalltime",
"C": "Count",
"e": "CPU_Efficiency",
"M": "Memory_Efficiency",
"t": "Time_Efficiency",
"e": "CPU_Eff",
"M": "Mem_Eff",
"t": "Time_Eff",
"j": "jobname",
"X": "waste_CPU",
"Y": "waste_Mem",
@@ -97,6 +99,7 @@ ALIASES = {
NUMERIC_COLUMNS = {
"CPUs",
"NTasks",
"Nodes",
"ReqMem",
"AllocMem",
@@ -104,10 +107,10 @@ NUMERIC_COLUMNS = {
"MemPerCPU",
"ReqWalltime",
"Count",
"CPU_Efficiency",
"CPU_Eff",
"MaxRSS_max",
"Memory_Efficiency",
"Time_Efficiency",
"Mem_Eff",
"Time_Eff",
"Walltime_max",
"waste_CPU",
"waste_Mem",
@@ -119,6 +122,7 @@ class JobRecord:
raw: dict[str, str]
username: str
jobname: str
reqtasks: int
cpus: int
nodes: int
reqmem_gb: float | None
@@ -136,10 +140,63 @@ class JobRecord:
time_eff: float | None
@dataclass
class AllocTresStruct:
mem: int | None = None
cpu: int = 0
def __post_init__(self):
if isinstance(self.mem, str):
self.mem = parse_size_to_bytes(self.mem)
if isinstance(self.cpu, str):
self.cpu = int(self.cpu)
@classmethod
def from_tres_string(cls,tres_str: str) -> AllocTresStruct:
"""Parse comma separated fields of a TRES string."""
if tres_str == "":
return AllocTresStruct()
supported_fields = {f.name for f in fields(AllocTresStruct)}
result = {}
for f in tres_str.split(","):
key,val = f.split("=", maxsplit=1)
result[key] = val
values = {k: v for k, v in result.items() if k in supported_fields}
return cls(**values)
@dataclass
class UsedTresStruct:
mem: int | None = None
cpu: str = ""
def __post_init__(self):
if isinstance(self.mem, str):
self.mem = parse_size_to_bytes(self.mem)
@classmethod
def from_tres_string(cls,tres_str: str) -> UsedTresStruct:
"""Parse comma separated fields of a TRESUsageInTot string."""
if tres_str == "":
return UsedTresStruct()
supported_fields = {f.name for f in fields(UsedTresStruct)}
result = {}
for f in tres_str.split(","):
key,val = f.split("=", maxsplit=1)
result[key] = val
values = {k: v for k, v in result.items() if k in supported_fields}
return cls(**values)
@dataclass
class OutputRow:
username: str
JobID: str
NTasks: int
CPUs: int
Nodes: int
ReqMem: float | None
@@ -148,9 +205,9 @@ class OutputRow:
MemPerCPU: float | None
ReqWalltime: float | None
Count: int
CPU_Efficiency: float | None
Memory_Efficiency: float | None
Time_Efficiency: float | None
CPU_Eff: float | None
Mem_Eff: float | None
Time_Eff: float | None
jobname: str
maxrss_max: float | None
walltime: float | None # in h
@@ -165,6 +222,7 @@ class OutputRow:
d: dict[str, Any] = {
"username": self.username,
"JobID": self.JobID,
"NTasks": self.NTasks,
"CPUs": self.CPUs,
"Nodes": self.Nodes,
"ReqMem": self.ReqMem,
@@ -176,18 +234,18 @@ class OutputRow:
"Walltime": self.walltime,
"Walltime_max": self.walltime_max,
"Count": self.Count,
"CPU_Efficiency": self.CPU_Efficiency,
"Memory_Efficiency": self.Memory_Efficiency,
"Time_Efficiency": self.Time_Efficiency,
"CPU_Eff": self.CPU_Eff,
"Mem_Eff": self.Mem_Eff,
"Time_Eff": self.Time_Eff,
"jobname": self.jobname,
"waste_CPU": self.waste_CPU,
"waste_Mem": self.waste_Mem,
}
if sdev:
for col, vals in [
("CPU_Efficiency", self._cpu_eff_values),
("Memory_Efficiency", self._mem_eff_values),
("Time_Efficiency", self._time_eff_values),
("CPU_Eff", self._cpu_eff_values),
("Mem_Eff", self._mem_eff_values),
("Time_Eff", self._time_eff_values),
]:
d[f"{col}_sdev"] = stdev_or_none(vals)
d[f"{col}_max"] = max(vals) if vals else None
@@ -264,30 +322,7 @@ def parse_size_to_bytes(value: str) -> int | None:
}[unit]
return num * mult
def parse_tres_mem_bytes(alloc_tres: str) -> int | None:
"""
Extract memory allocation from an AllocTRES string.
Examples
--------
cpu=16,mem=64G,node=1,billing=16
-> 68719476736
billing=64,cpu=64,mem=257698M,node=2
-> 270215421952
"""
if not alloc_tres:
return None
for field in alloc_tres.split(","):
field = field.strip()
if field.startswith("mem="):
return parse_size_to_bytes(field[4:])
return None
def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None:
"""Convert ReqMem into total requested bytes for the whole job allocation."""
raw = (reqmem or "").strip()
@@ -296,8 +331,10 @@ def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None:
return None
if raw.lower().endswith("c"):
return base * max(cpus, 1)
if raw.lower().endswith("n"):
return base * max(nodes, 1)
# Default Slurm ReqMem suffix is usually n: memory per node.
return base * max(nodes, 1)
return base
def common_prefix(names: list[str]) -> str:
@@ -410,7 +447,7 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None)
# Do not filter by User here. On many Slurm installations, job step rows
# are where MaxRSS is populated, but their User field may be empty or may
# not match the parent job's user. Filtering before grouping drops those
# rows and makes Memory_Efficiency become NA. Filter after identifying
# rows and makes Mem_Eff become NA. Filter after identifying
# the top-level job row instead.
grouped[base_job_id(row.get("JobIDRaw") or row.get("JobID", ""))].append(row)
@@ -439,16 +476,19 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None)
# find mem_used_tres over reported job steps. It may be in the
# *.interactive or *.0 steps
mem_used_tres_values = [parse_tres_mem_bytes(r.get("TRESUsageInTot", "")) \
for r in rss_source_rows]
# mem_used_tres_cleaned = [x for x in mem_used_tres_values if x is not None]
mem_used_tres = max([x for x in mem_used_tres_values if x is not None],
default=None)
used_tres = [UsedTresStruct.from_tres_string(r.get("TRESUsageInTot","")) \
for r in rss_source_rows]
mem_used_tres = max(r.mem for r in used_tres if r.mem is not None)
mem_used_tres_gb = None
if mem_used_tres is not None:
mem_used_tres_gb = mem_used_tres / (1024**3)
cpus = int(float(top.get("AllocCPUS") or 0))
req_tres = AllocTresStruct.from_tres_string(top.get("ReqTRes") or "")
# better than getting NTasks and then filtering out .extern and
# getting max over job steps
req_ntasks = req_tres.cpu
alloc_cpus = int(float(top.get("AllocCPUS") or 0))
nodes = int(float(top.get("NNodes") or 0))
elapsed = int(float(top.get("ElapsedRaw") or 0))
@@ -460,22 +500,25 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None)
totalcpu = slurm_duration_to_seconds(top.get("TotalCPU", ""))
# Memory requested in total by the user
reqmem = top.get("ReqMem", "")
reqmem_total = reqmem_total_bytes(reqmem, cpus, nodes)
# It is safer to rely on ReqTres.mem, instead constructing from ReqMem
# reqmem = top.get("ReqMem", "")
# reqmem_total = reqmem_total_bytes(reqmem, alloc_cpus, nodes)
reqmem_total = req_tres.mem
alloc_tres = AllocTresStruct.from_tres_string(top.get("AllocTRES") or "")
# Memory allocated by the scheduler, recorded in AllocTRES string
mem_alloc_tres = parse_tres_mem_bytes(top.get("AllocTRES") or "")
# mem_alloc_tres = parse_tres_mem_bytes(top.get("AllocTRES") or "")
mem_alloc_tres_gb = None
if mem_alloc_tres is not None:
mem_alloc_tres_gb = mem_alloc_tres / (1024**3)
if alloc_tres.mem is not None:
mem_alloc_tres_gb = alloc_tres.mem / (1024**3)
reqmem_gb = None
if reqmem_total is not None:
reqmem_gb = reqmem_total / (1024**3)
reqwall_hours = timelimit_seconds / 3600.0 if timelimit_seconds > 0 else None
mem_per_cpu_gb = mem_alloc_tres_gb / cpus if reqmem_gb is not None \
and mem_alloc_tres_gb is not None and cpus > 0 else None
mem_per_cpu_gb = mem_alloc_tres_gb / alloc_cpus if reqmem_gb is not None \
and mem_alloc_tres_gb is not None and alloc_cpus > 0 else None
# EFFFICIENCY CALCULATIONS
#
@@ -484,19 +527,21 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None)
# for these cases
cpu_eff = pct(totalcpu, cputime_raw)
time_eff = pct(elapsed, timelimit_seconds)
# Old metric:
# mem_eff = (Peak RSS of all job steps) / (user requested memory)
# mem_eff = pct(maxrss, reqmem_total)
# Better metric:
# mem_eff = (sum of peaks over all job steps) / (mem allocated by system)
mem_eff = pct(mem_used_tres, mem_alloc_tres)
mem_eff = pct(mem_used_tres, alloc_tres.mem)
records.append(
JobRecord(
raw=top,
username=top.get("User", ""),
jobname=top.get("JobName", ""),
cpus=cpus,
reqtasks=req_ntasks,
cpus=alloc_cpus,
nodes=nodes,
reqmem_gb=reqmem_gb,
mem_used_tres=mem_used_tres_gb,
@@ -528,22 +573,23 @@ def aggregate_records(records: list[JobRecord], args: argparse.Namespace) -> lis
matched = False
for pat, rx in compiled:
if rx.search(rec.jobname):
key = (pat, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours,
rec.username)
key = (pat, rec.username, rec.reqtasks, rec.cpus, rec.nodes,
rec.reqmem_gb, rec.reqwall_hours)
buckets[key].append(rec)
matched = True
break
if not matched:
unmatched.append(rec)
out = [make_aggregate_row(v, username=k[5], jobname=k[0]) for k, v in buckets.items()]
out = [make_aggregate_row(v, username=k[1], jobname=k[0]) for k, v in buckets.items()]
out.extend(make_single_row(r) for r in unmatched)
return out
if args.aggr_user:
buckets = defaultdict(list)
for rec in records:
key = (rec.username, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours)
key = (rec.username, rec.reqtasks, rec.cpus, rec.nodes,
rec.reqmem_gb, rec.reqwall_hours)
buckets[key].append(rec)
return [make_aggregate_row(v, username=k[0], jobname=f"{common_prefix([r.jobname for r in v])}") for k, v in buckets.items()]
@@ -565,6 +611,7 @@ def make_single_row(rec: JobRecord) -> OutputRow:
return OutputRow(
username=rec.username,
JobID=rec.raw.get("JobIDRaw", rec.raw.get("JobID", "")),
NTasks=rec.reqtasks,
CPUs=rec.cpus,
Nodes=rec.nodes,
ReqMem=rec.reqmem_gb,
@@ -573,9 +620,9 @@ def make_single_row(rec: JobRecord) -> OutputRow:
MemPerCPU=rec.mem_per_cpu_gb,
ReqWalltime=rec.reqwall_hours,
Count=1,
CPU_Efficiency=rec.cpu_eff,
Memory_Efficiency=rec.mem_eff,
Time_Efficiency=rec.time_eff,
CPU_Eff=rec.cpu_eff,
Mem_Eff=rec.mem_eff,
Time_Eff=rec.time_eff,
jobname=rec.jobname,
maxrss_max=rec.maxrss_bytes / (1024**3) if rec.maxrss_bytes is not None else None,
walltime=walltime,
@@ -623,6 +670,7 @@ def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) ->
return OutputRow(
username=username,
JobID="",
NTasks=first.reqtasks,
CPUs=first.cpus,
Nodes=first.nodes,
ReqMem=first.reqmem_gb,
@@ -631,9 +679,9 @@ def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) ->
MemPerCPU=first.mem_per_cpu_gb,
ReqWalltime=first.reqwall_hours,
Count=count,
CPU_Efficiency=cpu_efficiency,
Memory_Efficiency=memory_efficiency,
Time_Efficiency=mean_or_none(time_eff_vals),
CPU_Eff=cpu_efficiency,
Mem_Eff=memory_efficiency,
Time_Eff=mean_or_none(time_eff_vals),
jobname=jobname,
maxrss_max=max([r.maxrss_bytes for r in records if r.maxrss_bytes is not None]) / (1024**3) or None,
walltime=walltime,
@@ -697,7 +745,7 @@ def columns_for_sdev(base_cols: list[str]) -> list[str]:
out: list[str] = []
for col in base_cols:
out.append(col)
if col in {"CPU_Efficiency", "Memory_Efficiency", "Time_Efficiency"}:
if col in {"CPU_Eff", "Mem_Eff", "Time_Eff"}:
out += [f"{col}_sdev", f"{col}_max", f"{col}_min"]
return out
@@ -822,7 +870,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace:
help=(
"Slurm-like output format using aliases: "
"u=username,i=JobID,c=CPUs,N=Nodes,m=ReqMem,p=MemPerCPU,l=ReqWalltime,C=Count,"
"e=CPU_Efficiency,M=Memory_Efficiency,t=Time_Efficiency,j=jobname"
"e=CPU_Eff,M=Mem_Eff,t=Time_Eff,j=jobname"
),
)