diff --git a/slurm-eff-tool.py b/slurm-eff-tool.py index 00fd5ac..249ce13 100755 --- a/slurm-eff-tool.py +++ b/slurm-eff-tool.py @@ -3,10 +3,10 @@ slurm-eff-tool.py - Slurm job efficiency reporting and investigation tool. Efficiency definitions: - CPU_Efficiency = TotalCPU_seconds / CPUTimeRAW * 100 - Time_Efficiency = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100 + CPU_Eff = TotalCPU_seconds / CPUTimeRAW * 100 + Time_Eff = ElapsedRaw_seconds / (TimelimitRaw_minutes * 60) * 100 - Memory_Efficiency = 100 (sum of peaks over all job steps) / (mem allocated by system) + Mem_Eff = 100 (slurm recorded mem usage) / (mem allocated by system) This script intentionally asks sacct for raw parsable fields and caches those rows. TODO: allow to cache a faster already parsed and binary format. Keep sacct text option for @@ -30,7 +30,7 @@ import subprocess import sys import signal from collections import defaultdict -from dataclasses import dataclass, field +from dataclasses import dataclass, field, fields from pathlib import Path from typing import Any, Iterable @@ -49,6 +49,7 @@ SACCT_FIELDS = [ "CPUTimeRAW", "TotalCPU", "MaxRSS", + "ReqTRes", "AllocTRES", "TRESUsageInTot", ] @@ -58,21 +59,22 @@ DEFAULT_COLUMNS = [ "username", "JobID", "Count", + "NTasks", "CPUs", "Nodes", - "CPU_Efficiency", + "CPU_Eff", "waste_CPU", "ReqMem", - "UsedMem", "AllocMem", - "MemPerCPU", - "MaxRSS_max", - "Memory_Efficiency", + "UsedMem", + "Mem_Eff", "waste_Mem", + "MaxRSS_max", + "MemPerCPU", "ReqWalltime", "Walltime", "Walltime_max", - "Time_Efficiency", + "Time_Eff", "jobname", ] @@ -87,9 +89,9 @@ ALIASES = { "p": "MemPerCPU", "l": "ReqWalltime", "C": "Count", - "e": "CPU_Efficiency", - "M": "Memory_Efficiency", - "t": "Time_Efficiency", + "e": "CPU_Eff", + "M": "Mem_Eff", + "t": "Time_Eff", "j": "jobname", "X": "waste_CPU", "Y": "waste_Mem", @@ -97,6 +99,7 @@ ALIASES = { NUMERIC_COLUMNS = { "CPUs", + "NTasks", "Nodes", "ReqMem", "AllocMem", @@ -104,10 +107,10 @@ NUMERIC_COLUMNS = { "MemPerCPU", "ReqWalltime", "Count", - "CPU_Efficiency", + "CPU_Eff", "MaxRSS_max", - "Memory_Efficiency", - "Time_Efficiency", + "Mem_Eff", + "Time_Eff", "Walltime_max", "waste_CPU", "waste_Mem", @@ -119,6 +122,7 @@ class JobRecord: raw: dict[str, str] username: str jobname: str + reqtasks: int cpus: int nodes: int reqmem_gb: float | None @@ -136,10 +140,63 @@ class JobRecord: time_eff: float | None +@dataclass +class AllocTresStruct: + mem: int | None = None + cpu: int = 0 + + def __post_init__(self): + if isinstance(self.mem, str): + self.mem = parse_size_to_bytes(self.mem) + if isinstance(self.cpu, str): + self.cpu = int(self.cpu) + + @classmethod + def from_tres_string(cls,tres_str: str) -> AllocTresStruct: + """Parse comma separated fields of a TRES string.""" + if tres_str == "": + return AllocTresStruct() + + supported_fields = {f.name for f in fields(AllocTresStruct)} + result = {} + for f in tres_str.split(","): + key,val = f.split("=", maxsplit=1) + result[key] = val + + values = {k: v for k, v in result.items() if k in supported_fields} + return cls(**values) + + +@dataclass +class UsedTresStruct: + mem: int | None = None + cpu: str = "" + + def __post_init__(self): + if isinstance(self.mem, str): + self.mem = parse_size_to_bytes(self.mem) + + @classmethod + def from_tres_string(cls,tres_str: str) -> UsedTresStruct: + """Parse comma separated fields of a TRESUsageInTot string.""" + if tres_str == "": + return UsedTresStruct() + + supported_fields = {f.name for f in fields(UsedTresStruct)} + result = {} + for f in tres_str.split(","): + key,val = f.split("=", maxsplit=1) + result[key] = val + + values = {k: v for k, v in result.items() if k in supported_fields} + return cls(**values) + + @dataclass class OutputRow: username: str JobID: str + NTasks: int CPUs: int Nodes: int ReqMem: float | None @@ -148,9 +205,9 @@ class OutputRow: MemPerCPU: float | None ReqWalltime: float | None Count: int - CPU_Efficiency: float | None - Memory_Efficiency: float | None - Time_Efficiency: float | None + CPU_Eff: float | None + Mem_Eff: float | None + Time_Eff: float | None jobname: str maxrss_max: float | None walltime: float | None # in h @@ -165,6 +222,7 @@ class OutputRow: d: dict[str, Any] = { "username": self.username, "JobID": self.JobID, + "NTasks": self.NTasks, "CPUs": self.CPUs, "Nodes": self.Nodes, "ReqMem": self.ReqMem, @@ -176,18 +234,18 @@ class OutputRow: "Walltime": self.walltime, "Walltime_max": self.walltime_max, "Count": self.Count, - "CPU_Efficiency": self.CPU_Efficiency, - "Memory_Efficiency": self.Memory_Efficiency, - "Time_Efficiency": self.Time_Efficiency, + "CPU_Eff": self.CPU_Eff, + "Mem_Eff": self.Mem_Eff, + "Time_Eff": self.Time_Eff, "jobname": self.jobname, "waste_CPU": self.waste_CPU, "waste_Mem": self.waste_Mem, } if sdev: for col, vals in [ - ("CPU_Efficiency", self._cpu_eff_values), - ("Memory_Efficiency", self._mem_eff_values), - ("Time_Efficiency", self._time_eff_values), + ("CPU_Eff", self._cpu_eff_values), + ("Mem_Eff", self._mem_eff_values), + ("Time_Eff", self._time_eff_values), ]: d[f"{col}_sdev"] = stdev_or_none(vals) d[f"{col}_max"] = max(vals) if vals else None @@ -264,30 +322,7 @@ def parse_size_to_bytes(value: str) -> int | None: }[unit] return num * mult -def parse_tres_mem_bytes(alloc_tres: str) -> int | None: - """ - Extract memory allocation from an AllocTRES string. - Examples - -------- - - cpu=16,mem=64G,node=1,billing=16 - -> 68719476736 - - billing=64,cpu=64,mem=257698M,node=2 - -> 270215421952 - """ - - if not alloc_tres: - return None - - for field in alloc_tres.split(","): - field = field.strip() - - if field.startswith("mem="): - return parse_size_to_bytes(field[4:]) - - return None def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None: """Convert ReqMem into total requested bytes for the whole job allocation.""" raw = (reqmem or "").strip() @@ -296,8 +331,10 @@ def reqmem_total_bytes(reqmem: str, cpus: int, nodes: int) -> float | None: return None if raw.lower().endswith("c"): return base * max(cpus, 1) + if raw.lower().endswith("n"): + return base * max(nodes, 1) # Default Slurm ReqMem suffix is usually n: memory per node. - return base * max(nodes, 1) + return base def common_prefix(names: list[str]) -> str: @@ -410,7 +447,7 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) # Do not filter by User here. On many Slurm installations, job step rows # are where MaxRSS is populated, but their User field may be empty or may # not match the parent job's user. Filtering before grouping drops those - # rows and makes Memory_Efficiency become NA. Filter after identifying + # rows and makes Mem_Eff become NA. Filter after identifying # the top-level job row instead. grouped[base_job_id(row.get("JobIDRaw") or row.get("JobID", ""))].append(row) @@ -439,16 +476,19 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) # find mem_used_tres over reported job steps. It may be in the # *.interactive or *.0 steps - mem_used_tres_values = [parse_tres_mem_bytes(r.get("TRESUsageInTot", "")) \ - for r in rss_source_rows] - # mem_used_tres_cleaned = [x for x in mem_used_tres_values if x is not None] - mem_used_tres = max([x for x in mem_used_tres_values if x is not None], - default=None) + used_tres = [UsedTresStruct.from_tres_string(r.get("TRESUsageInTot","")) \ + for r in rss_source_rows] + mem_used_tres = max(r.mem for r in used_tres if r.mem is not None) mem_used_tres_gb = None if mem_used_tres is not None: mem_used_tres_gb = mem_used_tres / (1024**3) - - cpus = int(float(top.get("AllocCPUS") or 0)) + + req_tres = AllocTresStruct.from_tres_string(top.get("ReqTRes") or "") + # better than getting NTasks and then filtering out .extern and + # getting max over job steps + req_ntasks = req_tres.cpu + + alloc_cpus = int(float(top.get("AllocCPUS") or 0)) nodes = int(float(top.get("NNodes") or 0)) elapsed = int(float(top.get("ElapsedRaw") or 0)) @@ -460,22 +500,25 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) totalcpu = slurm_duration_to_seconds(top.get("TotalCPU", "")) # Memory requested in total by the user - reqmem = top.get("ReqMem", "") - reqmem_total = reqmem_total_bytes(reqmem, cpus, nodes) + # It is safer to rely on ReqTres.mem, instead constructing from ReqMem + # reqmem = top.get("ReqMem", "") + # reqmem_total = reqmem_total_bytes(reqmem, alloc_cpus, nodes) + reqmem_total = req_tres.mem + alloc_tres = AllocTresStruct.from_tres_string(top.get("AllocTRES") or "") # Memory allocated by the scheduler, recorded in AllocTRES string - mem_alloc_tres = parse_tres_mem_bytes(top.get("AllocTRES") or "") + # mem_alloc_tres = parse_tres_mem_bytes(top.get("AllocTRES") or "") mem_alloc_tres_gb = None - if mem_alloc_tres is not None: - mem_alloc_tres_gb = mem_alloc_tres / (1024**3) + if alloc_tres.mem is not None: + mem_alloc_tres_gb = alloc_tres.mem / (1024**3) reqmem_gb = None if reqmem_total is not None: reqmem_gb = reqmem_total / (1024**3) reqwall_hours = timelimit_seconds / 3600.0 if timelimit_seconds > 0 else None - mem_per_cpu_gb = mem_alloc_tres_gb / cpus if reqmem_gb is not None \ - and mem_alloc_tres_gb is not None and cpus > 0 else None + mem_per_cpu_gb = mem_alloc_tres_gb / alloc_cpus if reqmem_gb is not None \ + and mem_alloc_tres_gb is not None and alloc_cpus > 0 else None # EFFFICIENCY CALCULATIONS # @@ -484,19 +527,21 @@ def build_job_records(rows: list[dict[str, str]], only_user: str | None = None) # for these cases cpu_eff = pct(totalcpu, cputime_raw) time_eff = pct(elapsed, timelimit_seconds) + # Old metric: # mem_eff = (Peak RSS of all job steps) / (user requested memory) # mem_eff = pct(maxrss, reqmem_total) # Better metric: # mem_eff = (sum of peaks over all job steps) / (mem allocated by system) - mem_eff = pct(mem_used_tres, mem_alloc_tres) + mem_eff = pct(mem_used_tres, alloc_tres.mem) records.append( JobRecord( raw=top, username=top.get("User", ""), jobname=top.get("JobName", ""), - cpus=cpus, + reqtasks=req_ntasks, + cpus=alloc_cpus, nodes=nodes, reqmem_gb=reqmem_gb, mem_used_tres=mem_used_tres_gb, @@ -528,22 +573,23 @@ def aggregate_records(records: list[JobRecord], args: argparse.Namespace) -> lis matched = False for pat, rx in compiled: if rx.search(rec.jobname): - key = (pat, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours, - rec.username) + key = (pat, rec.username, rec.reqtasks, rec.cpus, rec.nodes, + rec.reqmem_gb, rec.reqwall_hours) buckets[key].append(rec) matched = True break if not matched: unmatched.append(rec) - out = [make_aggregate_row(v, username=k[5], jobname=k[0]) for k, v in buckets.items()] + out = [make_aggregate_row(v, username=k[1], jobname=k[0]) for k, v in buckets.items()] out.extend(make_single_row(r) for r in unmatched) return out if args.aggr_user: buckets = defaultdict(list) for rec in records: - key = (rec.username, rec.cpus, rec.nodes, rec.reqmem_gb, rec.reqwall_hours) + key = (rec.username, rec.reqtasks, rec.cpus, rec.nodes, + rec.reqmem_gb, rec.reqwall_hours) buckets[key].append(rec) return [make_aggregate_row(v, username=k[0], jobname=f"{common_prefix([r.jobname for r in v])}") for k, v in buckets.items()] @@ -565,6 +611,7 @@ def make_single_row(rec: JobRecord) -> OutputRow: return OutputRow( username=rec.username, JobID=rec.raw.get("JobIDRaw", rec.raw.get("JobID", "")), + NTasks=rec.reqtasks, CPUs=rec.cpus, Nodes=rec.nodes, ReqMem=rec.reqmem_gb, @@ -573,9 +620,9 @@ def make_single_row(rec: JobRecord) -> OutputRow: MemPerCPU=rec.mem_per_cpu_gb, ReqWalltime=rec.reqwall_hours, Count=1, - CPU_Efficiency=rec.cpu_eff, - Memory_Efficiency=rec.mem_eff, - Time_Efficiency=rec.time_eff, + CPU_Eff=rec.cpu_eff, + Mem_Eff=rec.mem_eff, + Time_Eff=rec.time_eff, jobname=rec.jobname, maxrss_max=rec.maxrss_bytes / (1024**3) if rec.maxrss_bytes is not None else None, walltime=walltime, @@ -623,6 +670,7 @@ def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) -> return OutputRow( username=username, JobID="", + NTasks=first.reqtasks, CPUs=first.cpus, Nodes=first.nodes, ReqMem=first.reqmem_gb, @@ -631,9 +679,9 @@ def make_aggregate_row(records: list[JobRecord], username: str, jobname: str) -> MemPerCPU=first.mem_per_cpu_gb, ReqWalltime=first.reqwall_hours, Count=count, - CPU_Efficiency=cpu_efficiency, - Memory_Efficiency=memory_efficiency, - Time_Efficiency=mean_or_none(time_eff_vals), + CPU_Eff=cpu_efficiency, + Mem_Eff=memory_efficiency, + Time_Eff=mean_or_none(time_eff_vals), jobname=jobname, maxrss_max=max([r.maxrss_bytes for r in records if r.maxrss_bytes is not None]) / (1024**3) or None, walltime=walltime, @@ -697,7 +745,7 @@ def columns_for_sdev(base_cols: list[str]) -> list[str]: out: list[str] = [] for col in base_cols: out.append(col) - if col in {"CPU_Efficiency", "Memory_Efficiency", "Time_Efficiency"}: + if col in {"CPU_Eff", "Mem_Eff", "Time_Eff"}: out += [f"{col}_sdev", f"{col}_max", f"{col}_min"] return out @@ -822,7 +870,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace: help=( "Slurm-like output format using aliases: " "u=username,i=JobID,c=CPUs,N=Nodes,m=ReqMem,p=MemPerCPU,l=ReqWalltime,C=Count," - "e=CPU_Efficiency,M=Memory_Efficiency,t=Time_Efficiency,j=jobname" + "e=CPU_Eff,M=Mem_Eff,t=Time_Eff,j=jobname" ), )