diff --git a/src/external/nexus/PyScripts/Install.md b/src/external/nexus/PyScripts/Install.md new file mode 100644 index 000000000..7cbd8bc19 --- /dev/null +++ b/src/external/nexus/PyScripts/Install.md @@ -0,0 +1,126 @@ +# Installation + +## Requirements + +| Package | Version | Purpose | +|---------|---------|---------| +| Python | ≥ 3.9 | Runtime | +| h5py | ≥ 3.0 | Read HDF5 files (NeXus Version 2) | +| pyhdf | ≥ 0.10 | Read HDF4 files (NeXus Version 1) — optional | +| pdfplumber | ≥ 0.9 | Extract schema from instrument definition PDF (`--pdf` mode) — optional | + +--- + +## Install h5py (required) + +h5py provides HDF5 support and is needed for all modern muon NeXus files (Version 2, +written since ~2020). + +```bash +pip install h5py +``` + +Or via your system package manager: + +```bash +# Fedora / RHEL +sudo dnf install python3-h5py + +# Ubuntu / Debian +sudo apt install python3-h5py + +# macOS (Homebrew) +brew install hdf5 +pip install h5py +``` + +--- + +## Install pyhdf (optional, HDF4 / Version 1 files only) + +pyhdf is only needed for reading old HDF4-format files (NeXus Version 1, written +before ~2011 by the MCS software at ISIS). If you only work with modern HDF5 files, +you can skip this step. + +pyhdf requires the HDF4 C library to be present on the system. + +### Linux + +```bash +# Fedora / RHEL +sudo dnf install python3-devel hdf hdf-devel +pip install pyhdf + +# Ubuntu / Debian +sudo apt install python3-dev libhdf4-dev +pip install pyhdf +``` + +> **Note (GCC 14+ / Fedora 40+):** pyhdf may fail to build with a +> `-Wincompatible-pointer-types` error. Work around it with: +> ```bash +> CFLAGS="-Wno-incompatible-pointer-types -Wno-discarded-qualifiers" pip install pyhdf +> ``` + +### macOS + +```bash +brew install hdf4 +pip install pyhdf +``` + +### Windows + +Pre-built wheels are available on PyPI for some Python / Windows combinations: + +```bash +pip install pyhdf +``` + +If no wheel is available, consider using a conda environment: + +```bash +conda install -c conda-forge pyhdf +``` + +--- + +## Install pdfplumber (optional, PDF-driven validation only) + +pdfplumber is only needed when you use the `--pdf` option to validate files against +a specific revision of the instrument definition PDF. + +```bash +pip install pdfplumber +``` + +Or via your system package manager (if available): + +```bash +# Fedora / RHEL +sudo dnf install python3-pdfplumber # may not be in all repos + +# Ubuntu / Debian +sudo apt install python3-pdfplumber # may not be in all repos + +# macOS (Homebrew) +pip install pdfplumber +``` + +--- + +## Verify the installation + +```bash +python3 -c "import h5py; print('h5py', h5py.__version__)" +python3 -c "import pyhdf; print('pyhdf ok')" # optional — HDF4 support +python3 -c "import pdfplumber; print('pdfplumber ok')" # optional — PDF-driven mode +``` + +--- + +## No installation needed + +The validator is a single self-contained script — no build step, no package +installation of the script itself is required. Simply place +`nexus_muon_validator.py` anywhere on your system and run it with Python. diff --git a/src/external/nexus/PyScripts/Usage.md b/src/external/nexus/PyScripts/Usage.md new file mode 100644 index 000000000..eec245453 --- /dev/null +++ b/src/external/nexus/PyScripts/Usage.md @@ -0,0 +1,223 @@ +# Usage — nexus_muon_validator.py + +Validates muon NeXus HDF4/5 files against the ISIS Muon Instrument Definitions +(Version 1 and Version 2 / *muonTD*). + +Two validation modes are available: + +- **Hardcoded mode** (default) — built-in rules based on the 2026 rev 11 spec. + No extra dependencies beyond `h5py`. +- **PDF-driven mode** (`--pdf`) — rules are extracted live from a + `nexus_instrument_definitions_*.pdf` that you supply. Requires `pdfplumber`. + +Reference document: +*NeXus Instrument Definitions for Muon Data*, S. Cottrell, 21 January 2026 +(`nexus_instrument_definitions_for_muon_data_2026_rev11.pdf`) + +--- + +## Basic invocation + +```bash +python3 nexus_muon_validator.py [ ...] +``` + +Validate one or more files in a single call: + +```bash +python3 nexus_muon_validator.py run001.nxs run002.nxs run003.nxs +``` + +--- + +## Command-line options + +| Option | Description | +|--------|-------------| +| `--pdf ` | Parse schema from a NeXus instrument definition PDF and validate against it | +| `--list-schema` | Print the schema extracted from `--pdf` and exit (no files needed) | +| `-v`, `--verbose` | Also show INFO-level findings (optional fields, format info) | +| `--errors-only` | Show only ERROR-level issues; suppress warnings | +| `-h`, `--help` | Show built-in help and exit | + +--- + +## Severity levels + +| Level | Meaning | +|---------|---------| +| `ERROR` | A field required by the specification is missing or unreadable. | +| `WARNING` | A field has an unexpected value, a legacy name, or a shape inconsistency. | +| `INFO` | An optional field recommended by the specification is absent (shown only with `-v`). | + +--- + +## Exit codes + +| Code | Meaning | +|------|---------| +| `0` | Validation passed — no ERRORs found | +| `1` | At least one ERROR was reported | +| `2` | File could not be opened or is not a recognised NeXus format | + +--- + +## Examples + +**Validate a single file (errors and warnings only):** +```bash +python3 nexus_muon_validator.py EMU00139040.nxs +``` + +**Validate a whole directory of runs:** +```bash +python3 nexus_muon_validator.py /data/musr/2025/*.nxs +``` + +**Show full detail including optional fields:** +```bash +python3 nexus_muon_validator.py -v EMU00139040.nxs +``` + +**Show only hard errors (useful in scripts):** +```bash +python3 nexus_muon_validator.py --errors-only EMU00139040.nxs +echo "Exit code: $?" +``` + +**Use in a shell script with exit-code checking:** +```bash +#!/bin/bash +python3 nexus_muon_validator.py --errors-only "$1" +if [ $? -ne 0 ]; then + echo "Validation failed for $1" + exit 1 +fi +``` + +**Validate against a specific revision of the instrument definition PDF:** +```bash +python3 nexus_muon_validator.py \ + --pdf nexus_instrument_definitions_for_muon_data_2026_rev11.pdf \ + EMU00139040.nxs +``` + +**Inspect the schema extracted from a PDF (no files needed):** +```bash +python3 nexus_muon_validator.py \ + --pdf nexus_instrument_definitions_for_muon_data_2026_rev11.pdf \ + --list-schema +``` + +Example `--list-schema` output: +``` +Parsed schema from: nexus_instrument_definitions_for_muon_data_2026_rev11.pdf … + → 35 NX classes found (42 version entries) + + NXdata v1 required=0 optional=8 attrs=18 + NXdata v2 required=2 optional=6 attrs=9 + NXdetector v1 required=0 optional=4 attrs=4 + NXdetector v2 required=4 optional=28 attrs=34 + NXentry v1 required=0 optional=18 attrs=1 + NXentry v2 required=11 optional=18 attrs=9 + ... +``` + +--- + +## What is checked + +### File format +- Detects HDF5 (via `h5py`) or HDF4 (via `pyhdf`) automatically. +- HDF4 files are Version 1 by definition; HDF5 files may be Version 1 or 2. +- Reports an error if the format is unrecognised or the file cannot be opened. + +### Version detection +The instrument definition version is detected automatically: + +| Condition | Detected version | +|-----------|-----------------| +| HDF4 file | **Version 1** (always) | +| HDF5: entry `definition` = `muonTD` or `pulsedTD`, or `IDF_version` = 2 | **Version 2** | +| HDF5: entry group named `run` (NXentry), no `definition` field | **Version 1** | + +### Version 1 checks (HDF4 or HDF5 `NXfile` / `NXentry`) +Covers the original muon instrument definition (MCS/RAL, 2001). + +- Root attribute: `@NeXus_version` (WARNING if absent) +- NXentry (`run`): `IDF_version`, `program_name`, `number`, `title`, `notes`, + `analysis`, `lab`, `beamline`, `start_time`, `stop_time`, `switching_states` +- NXuser: `name`, `experiment_number` +- NXsample: `temperature` (+`@units`), `magnetic_field` (+`@units`) +- NXinstrument: `name` + - NXdetector: `number`; optional `deadtimes` (+`@units`, `@available`), + `angles` (+`@coordinate_system`, `@available`) + - NXcollimator: `type` + - NXbeam: `total_counts` (+`@units`) +- NXdata (`histogram_data_1`): `counts` (+`@units`, `@signal`, + `@t0_bin`, `@first_good_bin`, `@last_good_bin`), + `resolution` (+`@units`), `time_zero` (+`@units`, `@available`), + `raw_time` (+`@axis`, `@primary`, `@units`) + +### Version 2 checks (`NXroot` / `muonTD`) +Covers the revised muon instrument definition (ISIS, 2011–2026). + +- Root attributes: `@file_name` (required), `@file_time` (required) +- At least one `raw_data_N` NXentry must be present +- NXentry: `IDF_version` (= 2), `definition` (= `muonTD`), `run_number`, + `title`, `start_time`, `end_time`, `experiment_identifier` +- NXsample: `name` +- NXinstrument: `name` + - NXsource: `name`, `type`, `probe` + - NXdetector (`detector_*`): `counts` (+`@signal`, `@axes`, `@long_name`), + `raw_time` (+`@units`), `spectrum_index` +- NXdata (`detector_*`): `counts` (+`@signal`, `@axes`), `raw_time` (+`@units`) +- NXuser (`user_1`): `name` + +### Dimensional consistency checks +- `raw_time` shape must be `(ntc,)` (bin centres) or `(ntc+1,)` (bin boundaries), + where `ntc` is the last dimension of `counts`. +- `corrected_time` shape must be `(ntc,)`. +- `spectrum_index` shape must be `(ns,)`, matching the second-to-last dimension + of `counts`. + +### Legacy / transitional handling +The validator distinguishes real errors from known historical deviations: + +| Observed value | Expected (spec) | Reported as | +|----------------|-----------------|-------------| +| `pulsedTD` | `muonTD` | WARNING — legacy name, used in files written before rev 8 | +| `time_of_flight` | `raw_time` | WARNING — legacy dataset name used in files before ~2020 | +| `muons` | `positive muons` or `negative muons` | WARNING — non-specific probe label | +| `n/a` for `type` or `probe` in NXsource | specific string | WARNING | + +--- + +## Sample output + +``` +======================================================================== + File: EMU00139040.nxs +======================================================================== + [WARNING] /raw_data_1/definition → Value is 'pulsedTD' (legacy name); + current spec (rev≥8) requires 'muonTD' +======================================================================== + Summary: 0 error(s), 1 warning(s) +======================================================================== +``` + +With `--verbose`: +``` +======================================================================== + File: EMU00139040.nxs +======================================================================== + [WARNING] /raw_data_1/definition → Value is 'pulsedTD' (legacy name); ... + [INFO ] / → File format: HDF5 + [INFO ] / → Detected muon NeXus instrument definition version: 2 + [INFO ] /raw_data_1/instrument → Optional group 'beamline' not present + [INFO ] /raw_data_1/sample → Optional dataset 'magnetic_field_state' not present + ... +======================================================================== + Summary: 0 error(s), 1 warning(s), 13 info(s) +======================================================================== +``` diff --git a/src/external/nexus/PyScripts/nexus_muon_validator.py b/src/external/nexus/PyScripts/nexus_muon_validator.py new file mode 100644 index 000000000..bf4ec9f1a --- /dev/null +++ b/src/external/nexus/PyScripts/nexus_muon_validator.py @@ -0,0 +1,1331 @@ +#!/usr/bin/env python3 +""" +nexus_muon_validator.py + +Validates muon NeXus HDF4/5 files against the ISIS Muon Instrument Definitions +(Version 1 and Version 2 / 'muonTD'). + +Two validation modes: + 1. Hardcoded mode (default): + Uses built-in rules derived from the specification. + No extra dependencies beyond h5py. + + 2. PDF-driven mode (--pdf ): + Parses a nexus_instrument_definitions_*.pdf, extracts the table-based + NX class schemas, and uses them for structural validation. + Requires: pip install pdfplumber + +In both modes, dimensional-consistency checks (counts/raw_time/spectrum_index +shape agreement) are always run. + +Supported file formats: + HDF5 — opened with h5py (required: pip install h5py) + HDF4 — opened with pyhdf (optional: pip install pyhdf; only for old V1 files) + +Usage: + python nexus_muon_validator.py [ ...] + python nexus_muon_validator.py --pdf nexus_instrument_definitions_*.pdf ... + python nexus_muon_validator.py --pdf --list-schema + python nexus_muon_validator.py --help + +Exit codes: + 0 no ERRORs found + 1 at least one ERROR was reported + 2 file could not be opened / invalid format +""" + +from __future__ import annotations + +import argparse +import fnmatch +import re +import sys +from dataclasses import dataclass, field as dc_field +from pathlib import Path +from typing import Any, Optional + +# --------------------------------------------------------------------------- +# Severity levels and Issue dataclass +# --------------------------------------------------------------------------- +ERROR = "ERROR" +WARNING = "WARNING" +INFO = "INFO" + + +@dataclass +class Issue: + severity: str # ERROR | WARNING | INFO + path: str # HDF path where the issue was found + message: str + + def __str__(self) -> str: + return f"[{self.severity:<7}] {self.path} → {self.message}" + + +# --------------------------------------------------------------------------- +# Schema dataclasses (used by PDF-driven mode) +# --------------------------------------------------------------------------- + +@dataclass +class FieldDef: + """One row of an NX class definition table from the PDF.""" + re: str # '' / '1' / '0/1' / '1+' / '0/1+' + name: str # dataset or group name; may be wildcard like 'detector_*' + attribute: str # attribute name; empty for dataset/group rows + nx_type: str # NX_CHAR, NX_INT32, NXentry, NXdetector, … + value: str # expected / example value from the PDF + description: str + + @property + def is_required(self) -> bool: + return self.re.strip() in ("1", "1+") + + @property + def is_group(self) -> bool: + """True if this field is a subgroup (nx_type is an NX class, not NX_ scalar).""" + return (bool(self.nx_type) + and self.nx_type.startswith("NX") + and not self.nx_type.startswith("NX_")) + + @property + def is_wildcard(self) -> bool: + return "*" in self.name or (self.name.startswith("{") and self.name.endswith("}")) + + @property + def strict_value(self) -> Optional[str]: + """ + Return the expected value if the PDF defines a strict constraint. + Values in single quotes or bare integers/floats are treated as strict. + Plain descriptive text (no quotes, multiple words) is ignored. + """ + v = self.value.strip() + if v.startswith("'") and v.endswith("'"): + return v[1:-1] + if v.startswith('"') and v.endswith('"'): + return v[1:-1] + if re.match(r"^-?\d+(\.\d+)?$", v): + return v + # Multiple-choice: '1' | '2' → skip (not a single constraint) + return None + + +@dataclass +class ClassSchema: + """Schema for one NX class as extracted from the PDF.""" + nx_class: str + version: int # 1 or 2 (0 = section not detected) + fields: list[FieldDef] = dc_field(default_factory=list) + + +# --------------------------------------------------------------------------- +# HDF5 helper utilities +# --------------------------------------------------------------------------- + +def _decode(v: Any) -> Any: + """Decode bytes / numpy bytes scalar to plain str.""" + if isinstance(v, bytes): + return v.decode(errors="replace").strip() + if hasattr(v, "dtype") and hasattr(v, "item"): + raw = v.item() + if isinstance(raw, bytes): + return raw.decode(errors="replace").strip() + return raw + if hasattr(v, "__len__") and not isinstance(v, str) and len(v) == 1: + return _decode(v[0]) + return v + + +def _scalar(ds) -> Any: + import numpy as np + val = ds[()] + if isinstance(val, np.ndarray): + if val.shape == () or val.size == 1: + val = val.flat[0] + return _decode(val) + + +def _nx_class(group) -> str: + import h5py + if not isinstance(group, h5py.Group): + return "" + v = group.attrs.get("NX_class", b"") + return _decode(v) + + +def _find_groups_by_class(parent, nx_class: str) -> list: + import h5py + return [(n, g) for n, g in parent.items() + if isinstance(g, h5py.Group) and _nx_class(g) == nx_class] + + +def _find_groups_by_prefix(parent, prefix: str) -> list: + import h5py + return [(n, g) for n, g in parent.items() + if isinstance(g, h5py.Group) and n.startswith(prefix)] + + +def _has_dataset(group, name: str) -> bool: + import h5py + return name in group and isinstance(group[name], h5py.Dataset) + + +def _has_group(group, name: str) -> bool: + import h5py + return name in group and isinstance(group[name], h5py.Group) + + +def _check_required_dataset(group, name: str, path: str, issues: list) -> bool: + if not _has_dataset(group, name): + issues.append(Issue(ERROR, path, f"Required dataset '{name}' is missing")) + return False + return True + + +def _check_optional_dataset(group, name: str, path: str, issues: list) -> bool: + if not _has_dataset(group, name): + issues.append(Issue(INFO, path, f"Optional dataset '{name}' not present")) + return False + return True + + +def _check_required_group(group, name: str, expected_class: str, + path: str, issues: list) -> Any: + if not _has_group(group, name): + issues.append(Issue(ERROR, path, + f"Required group '{name}' ({expected_class}) is missing")) + return None + g = group[name] + cls = _nx_class(g) + if expected_class and cls != expected_class: + issues.append(Issue(WARNING, f"{path}/{name}", + f"NX_class is '{cls}', expected '{expected_class}'")) + return g + + +def _check_dataset_attr(ds, attr: str, path: str, issues: list, + expected_value: Any = None, + severity: str = ERROR) -> bool: + if attr not in ds.attrs: + issues.append(Issue(severity, path, f"Required attribute '@{attr}' is missing")) + return False + if expected_value is not None: + actual = _decode(ds.attrs[attr]) + if str(actual) != str(expected_value): + issues.append(Issue(WARNING, path, + f"@{attr} = '{actual}', expected '{expected_value}'")) + return False + return True + + +# --------------------------------------------------------------------------- +# PDF schema parsing +# --------------------------------------------------------------------------- + +def _find_version_boundaries(pdf) -> tuple[int, int]: + """ + Pre-scan the PDF and return (v1_start_idx, v2_start_idx) as 0-based + page indices for the actual content sections (not the table-of-contents). + + Looks for the section-header lines that introduce each version, e.g.: + "Muon Instrument Definition: Version 1" + "Muon Instrument Definition: Version 2" + These appear in the body text only, not in the TOC or title page. + TOC pages are identified by long dot-sequences (..........). + """ + TOC_RE = re.compile(r"\.{5,}") + # Section headers that mark the start of V1 / V2 content + V1_HDR = re.compile(r"Muon Instrument Definition\s*[:–-]\s*Version\s+1", re.IGNORECASE) + V2_HDR = re.compile(r"Muon Instrument Definition\s*[:–-]\s*Version\s+2", re.IGNORECASE) + v1_page = -1 + v2_page = -1 + for i, page in enumerate(pdf.pages): + text = page.extract_text() or "" + if TOC_RE.search(text): # skip table-of-contents pages + continue + if v1_page < 0 and V1_HDR.search(text): + v1_page = i + if v2_page < 0 and V2_HDR.search(text): + v2_page = i + return v1_page, v2_page + + +def _norm_cell(c: Any) -> str: + """ + Normalise one PDF table cell: + • convert None → '' + • join mid-word line-breaks (e.g. 'experiment_ide\\nntifier' → 'experiment_identifier') + • collapse remaining whitespace to single space + """ + text = str(c or "") + # Remove line-breaks that split a single token (no space before/after \n) + text = re.sub(r"(? dict[str, list[ClassSchema]]: + """ + Parse a nexus_instrument_definitions_*.pdf and return a schema dict. + + Returns { class_name: [ClassSchema, ...] } + with one ClassSchema per version section found in the PDF. + + Requires: pip install pdfplumber + """ + try: + import pdfplumber + except ImportError: + raise ImportError( + "pdfplumber is required for PDF-driven validation.\n" + "Install with: pip install pdfplumber" + ) + + schema: dict[str, list[ClassSchema]] = {} + current: Optional[ClassSchema] = None # class currently being parsed + + with pdfplumber.open(pdf_path) as pdf: + # Step 1: determine which pages belong to V1 / V2 sections + v1_start, v2_start = _find_version_boundaries(pdf) + + # Step 2: process all pages + for page_idx, page in enumerate(pdf.pages): + # Assign version from page position, not page text + if v2_start >= 0 and page_idx >= v2_start: + page_version = 2 + elif v1_start >= 0 and page_idx >= v1_start: + page_version = 1 + else: + page_version = 0 # preamble / TOC + + # Extract tables (prefer line-strategy for PDFs with visible borders) + tables = (page.extract_tables(table_settings={ + "vertical_strategy": "lines", + "horizontal_strategy": "lines", + }) or + page.extract_tables() or []) + + for table in tables: + if not table: + continue + for raw_row in table: + if raw_row is None: + continue + + # Normalise: join mid-word breaks, collapse whitespace + cells = [_norm_cell(c) for c in raw_row] + while len(cells) < 6: + cells.append("") + re_val, name, attr, nx_type, value, desc = cells[:6] + + # Skip column-header rows (RE | Name | Attribute | Type | …) + if name in ("Name", "RE") and attr in ("Attribute", ""): + if re_val in ("", "RE") and nx_type in ("", "Type"): + continue + + # Skip fully-empty rows + if not any([re_val, name, attr, nx_type, value, desc]): + continue + + # --- Class-identifier row --- + # Name matches NX\w+ exactly, Attribute and Type are empty. + if re.match(r"^NX\w+$", name) and not attr and not nx_type: + if current is not None: + _schema_store(schema, current) + current = ClassSchema(nx_class=name, version=page_version) + continue + + if current is None: + continue # rows before the first recognised NX class + + fd = FieldDef( + re=re_val, + name=name, + attribute=attr, + nx_type=nx_type, + value=value, + description=desc, + ) + current.fields.append(fd) + + # Flush the last class + if current is not None: + _schema_store(schema, current) + + return schema + + +def _schema_store(schema: dict, cs: ClassSchema) -> None: + """Add a ClassSchema to the dict, merging with an existing entry of the same version.""" + if cs.nx_class not in schema: + schema[cs.nx_class] = [] + for existing in schema[cs.nx_class]: + if existing.version == cs.version: + existing.fields.extend(cs.fields) + return + schema[cs.nx_class].append(cs) + + +def _get_class_schema(schema: dict, + class_name: str, + version: int) -> Optional[ClassSchema]: + """Return the best-matching ClassSchema for a given version.""" + candidates = schema.get(class_name, []) + for cs in candidates: + if cs.version == version: + return cs + for cs in candidates: + if cs.version == 0: + return cs + return candidates[0] if candidates else None + + +def schema_summary(schema: dict) -> str: + """Human-readable listing of what was parsed from the PDF.""" + lines = [] + for class_name, versions in sorted(schema.items()): + for cs in versions: + n_req = sum(1 for f in cs.fields if f.is_required and not f.attribute) + n_opt = sum(1 for f in cs.fields if not f.is_required and not f.attribute) + n_attr = sum(1 for f in cs.fields if f.attribute) + lines.append( + f" {class_name:<30} v{cs.version} " + f"required={n_req} optional={n_opt} attrs={n_attr}" + ) + return "\n".join(lines) if lines else " (no classes found)" + + +# --------------------------------------------------------------------------- +# Schema-driven validation (PDF mode) +# --------------------------------------------------------------------------- + +def _match_pattern(item_name: str, field_name: str) -> bool: + """Match an HDF5 item name against a (possibly wildcard) field pattern.""" + pattern = field_name.strip("{}") + return item_name == pattern or fnmatch.fnmatch(item_name, pattern) + + +def validate_with_schema(f, schema: dict, version: int, + issues: list, show_info: bool) -> None: + """ + Top-level schema-driven validation for an open HDF5 file. + Walks all NXentry groups and validates them recursively. + """ + import h5py + + root_class = "NXroot" if version == 2 else "NXfile" + cs = _get_class_schema(schema, root_class, version) + if cs: + if show_info: + issues.append(Issue(INFO, "/", + f"PDF schema: validating root as {root_class} (version {version})")) + # Check root-level attributes (FieldDef with empty name, non-empty attribute) + for fd in cs.fields: + if fd.attribute and not fd.name: + _schema_check_attr(f, fd, "/", issues, show_info) + else: + if show_info: + issues.append(Issue(INFO, "/", + f"PDF schema: no definition for {root_class}; " + "skipping root-level attribute checks")) + + # Validate every NXentry in the file + for entry_name, entry_item in f.items(): + if not isinstance(entry_item, h5py.Group): + continue + if _nx_class(entry_item) != "NXentry": + continue + entry_path = f"/{entry_name}" + _validate_group_schema(entry_item, "NXentry", entry_path, + issues, schema, version, show_info) + + +def _validate_group_schema(group, class_name: str, path: str, + issues: list, schema: dict, + version: int, show_info: bool) -> None: + """Recursively validate an HDF5 group against its PDF-derived schema.""" + import h5py + + cs = _get_class_schema(schema, class_name, version) + if cs is None: + if show_info: + issues.append(Issue(INFO, path, + f"No PDF schema for class '{class_name}' — skipped")) + return + + last_ds_name = "" # track for attribute rows + + for fd in cs.fields: + name = fd.name + attr = fd.attribute + + # --- Attribute row: applies to the preceding dataset --- + if attr and not name: + if last_ds_name and last_ds_name in group: + _schema_check_attr(group[last_ds_name], fd, + f"{path}/{last_ds_name}", issues, show_info) + continue + + if not name: + continue + + # Track for subsequent attribute rows + if not fd.is_group: + last_ds_name = name + + # --- Wildcard pattern --- + if fd.is_wildcard: + _schema_check_wildcard(group, fd, path, issues, schema, version, show_info) + continue + + # --- Subgroup --- + if fd.is_group: + _schema_check_group(group, fd, path, issues, schema, version, show_info) + + # --- Dataset --- + else: + _schema_check_dataset(group, fd, path, issues, show_info) + + +def _schema_check_attr(target, fd: FieldDef, path: str, + issues: list, show_info: bool) -> None: + attr = fd.attribute + if attr not in target.attrs: + if fd.is_required: + issues.append(Issue(ERROR, path, + f"Required attribute '@{attr}' is missing")) + elif show_info: + issues.append(Issue(INFO, path, + f"Optional attribute '@{attr}' not present")) + return + sv = fd.strict_value + if sv is not None: + actual = str(_decode(target.attrs[attr])) + if actual != sv: + issues.append(Issue(WARNING, path, + f"@{attr} = '{actual}', expected '{sv}'")) + + +def _schema_check_dataset(group, fd: FieldDef, path: str, + issues: list, show_info: bool) -> None: + name = fd.name + if not _has_dataset(group, name): + if fd.is_required: + issues.append(Issue(ERROR, path, + f"Required dataset '{name}' ({fd.nx_type}) is missing")) + elif show_info: + issues.append(Issue(INFO, path, + f"Optional dataset '{name}' not present")) + return + sv = fd.strict_value + if sv is not None: + try: + actual = str(_scalar(group[name])) + if actual != sv: + issues.append(Issue(WARNING, f"{path}/{name}", + f"Value is '{actual}', expected '{sv}'")) + except Exception: + pass + + +def _schema_check_group(group, fd: FieldDef, path: str, + issues: list, schema: dict, + version: int, show_info: bool) -> None: + import h5py + name = fd.name + nx_type = fd.nx_type + if not _has_group(group, name): + if fd.is_required: + issues.append(Issue(ERROR, path, + f"Required group '{name}' ({nx_type}) is missing")) + elif show_info: + issues.append(Issue(INFO, path, + f"Optional group '{name}' ({nx_type}) not present")) + return + sub = group[name] + actual_cls = _nx_class(sub) + if actual_cls != nx_type: + issues.append(Issue(WARNING, f"{path}/{name}", + f"NX_class is '{actual_cls}', expected '{nx_type}'")) + _validate_group_schema(sub, nx_type, f"{path}/{name}", + issues, schema, version, show_info) + + +def _schema_check_wildcard(group, fd: FieldDef, path: str, + issues: list, schema: dict, + version: int, show_info: bool) -> None: + import h5py + nx_type = fd.nx_type + matches = [(k, v) for k, v in group.items() + if _match_pattern(k, fd.name)] + if not matches: + if fd.is_required: + issues.append(Issue(ERROR, path, + f"Required item matching '{fd.name}' ({nx_type}) not found")) + elif show_info: + issues.append(Issue(INFO, path, + f"Optional item matching '{fd.name}' not present")) + return + if fd.is_group: + for item_name, item in matches: + if isinstance(item, h5py.Group): + actual_cls = _nx_class(item) + if actual_cls != nx_type: + issues.append(Issue(WARNING, f"{path}/{item_name}", + f"NX_class is '{actual_cls}', expected '{nx_type}'")) + _validate_group_schema(item, nx_type, f"{path}/{item_name}", + issues, schema, version, show_info) + + +# --------------------------------------------------------------------------- +# Dimensional consistency checks (always run, both modes) +# --------------------------------------------------------------------------- + +def check_dimensional_consistency(entry, entry_path: str, issues: list) -> None: + """ + Check shape agreement of detector data arrays: + raw_time / time_of_flight : (ntc,) or (ntc+1,) + corrected_time : (ntc,) + spectrum_index : (ns,) + where ntc and ns are derived from the counts array shape. + """ + import h5py + + # Look in both NXdata and NXdetector groups named detector_* + for name, item in entry.items(): + if not isinstance(item, h5py.Group): + continue + if not name.startswith("detector_"): + continue + if _nx_class(item) not in ("NXdata", "NXdetector", ""): + continue + + det_path = f"{entry_path}/{name}" + if not _has_dataset(item, "counts"): + continue + c_shape = item["counts"].shape + if len(c_shape) < 2: + issues.append(Issue(WARNING, f"{det_path}/counts", + f"counts has shape {c_shape}; " + "expected ≥2D (ns,ntc) or (np,ns,ntc)")) + continue + + ntc = c_shape[-1] + ns = c_shape[-2] + + # raw_time / time_of_flight + for rt_name in ("raw_time", "time_of_flight"): + if _has_dataset(item, rt_name): + rt_shape = item[rt_name].shape + if rt_shape not in ((ntc,), (ntc + 1,)): + issues.append(Issue(WARNING, f"{det_path}/{rt_name}", + f"shape {rt_shape} inconsistent with ntc={ntc}; " + "expected (ntc,) [bin centres] or (ntc+1,) [bin boundaries]")) + break + + # corrected_time + if _has_dataset(item, "corrected_time"): + ct_shape = item["corrected_time"].shape + if ct_shape != (ntc,): + issues.append(Issue(WARNING, f"{det_path}/corrected_time", + f"shape {ct_shape} inconsistent with ntc={ntc}; " + "expected (ntc,)")) + + # spectrum_index + if _has_dataset(item, "spectrum_index"): + si_shape = item["spectrum_index"].shape + if si_shape != (ns,): + issues.append(Issue(WARNING, f"{det_path}/spectrum_index", + f"shape {si_shape} inconsistent with ns={ns}; " + "expected (ns,)")) + + +# --------------------------------------------------------------------------- +# Hardcoded validation — Version 2 (fallback when no PDF is provided) +# --------------------------------------------------------------------------- + +def _hc_validate_v2_root(f, issues: list) -> None: + path = "/" + for attr in ("file_name", "file_time"): + if attr not in f.attrs: + issues.append(Issue(ERROR, path, + f"Required root attribute '@{attr}' is missing")) + for attr in ("NeXus_version", "HDF5_Version", "creator"): + if attr not in f.attrs: + issues.append(Issue(INFO, path, + f"Optional root attribute '@{attr}' not present")) + raw_entries = _find_groups_by_prefix(f, "raw_data_") + if not raw_entries: + raw_entries = _find_groups_by_class(f, "NXentry") + if not raw_entries: + issues.append(Issue(ERROR, path, + "No 'raw_data_*' (NXentry) groups found")) + + +def _hc_validate_v2_entry(entry, entry_path: str, issues: list) -> None: + p = entry_path + for name in ("IDF_version", "definition", "run_number", "title", + "start_time", "end_time", "experiment_identifier"): + _check_required_dataset(entry, name, p, issues) + + if "definition" in entry: + defn = _scalar(entry["definition"]) + if defn == "muonTD": + pass + elif defn in ("pulsedTD", "pulsed_TD"): + issues.append(Issue(WARNING, f"{p}/definition", + f"Value is '{defn}' (legacy name); " + "current spec (rev≥8) requires 'muonTD'")) + else: + issues.append(Issue(WARNING, f"{p}/definition", + f"Value is '{defn}', expected 'muonTD'")) + + if "IDF_version" in entry: + try: + if int(_scalar(entry["IDF_version"])) != 2: + issues.append(Issue(WARNING, f"{p}/IDF_version", + f"Value is '{_scalar(entry['IDF_version'])}', expected '2'")) + except (ValueError, TypeError): + pass + + for name in ("beamline", "notes", "duration", "collection_time", + "total_counts", "good_frames", "raw_frames", + "proton_charge", "program_name", "run_cycle"): + _check_optional_dataset(entry, name, p, issues) + + sample = _check_required_group(entry, "sample", "NXsample", p, issues) + if sample is not None: + _hc_validate_v2_sample(sample, f"{p}/sample", issues) + + instrument = _check_required_group(entry, "instrument", "NXinstrument", p, issues) + if instrument is not None: + _hc_validate_v2_instrument(instrument, f"{p}/instrument", issues) + + det_data = [(n, g) for n, g in _find_groups_by_prefix(entry, "detector_") + if _nx_class(g) == "NXdata"] + if not det_data: + issues.append(Issue(ERROR, p, + "No 'detector_*' (NXdata) groups found — expected at least one")) + else: + for name, grp in det_data: + _hc_validate_v2_nxdata(grp, f"{p}/{name}", issues) + + for grp_name in ("user_1", "runlog", "selog", "periods"): + if grp_name not in entry: + issues.append(Issue(INFO, p, f"Optional group '{grp_name}' not present")) + elif grp_name == "user_1": + _hc_validate_v2_user(entry["user_1"], f"{p}/user_1", issues) + + +def _hc_validate_v2_sample(sample, path: str, issues: list) -> None: + _check_required_dataset(sample, "name", path, issues) + for name in ("temperature", "magnetic_field", "magnetic_field_state"): + if not _has_dataset(sample, name): + issues.append(Issue(INFO, path, f"Optional dataset '{name}' not present")) + + +def _hc_validate_v2_instrument(instr, path: str, issues: list) -> None: + _check_required_dataset(instr, "name", path, issues) + + source = _check_required_group(instr, "source", "NXsource", path, issues) + if source is not None: + _hc_validate_v2_source(source, f"{path}/source", issues) + + det_groups = [(n, g) for n, g in _find_groups_by_prefix(instr, "detector_") + if _nx_class(g) == "NXdetector"] + if not det_groups: + issues.append(Issue(ERROR, path, + "No 'detector_*' (NXdetector) groups found in NXinstrument")) + else: + for name, grp in det_groups: + _hc_validate_v2_nxdetector(grp, f"{path}/{name}", issues) + + for grp_name in ("dae", "beamline"): + if grp_name not in instr: + issues.append(Issue(INFO, path, f"Optional group '{grp_name}' not present")) + + +def _hc_validate_v2_source(source, path: str, issues: list) -> None: + for name in ("name", "type", "probe"): + _check_required_dataset(source, name, path, issues) + if "type" in source: + t = str(_scalar(source["type"])).lower() + if t not in ("pulsed muon source", "low energy muon source"): + issues.append(Issue(WARNING, f"{path}/type", + f"Unexpected source type '{_scalar(source['type'])}'")) + if "probe" in source: + p = str(_scalar(source["probe"])).lower() + if p in ("positive muons", "negative muons"): + pass + elif p in ("muons", "positive muon", "negative muon"): + issues.append(Issue(WARNING, f"{path}/probe", + f"Probe value '{_scalar(source['probe'])}' is non-specific; " + "expected 'positive muons' or 'negative muons'")) + else: + issues.append(Issue(WARNING, f"{path}/probe", + f"Unexpected probe value '{_scalar(source['probe'])}'")) + for name in ("source_frequency", "source_energy", "source_current"): + if not _has_dataset(source, name): + issues.append(Issue(INFO, path, f"Optional dataset '{name}' not present")) + + +def _hc_validate_v2_nxdetector(det, path: str, issues: list) -> None: + import h5py + if _check_required_dataset(det, "counts", path, issues): + cd = det["counts"] + _check_dataset_attr(cd, "signal", f"{path}/counts", issues, expected_value="1") + _check_dataset_attr(cd, "axes", f"{path}/counts", issues) + _check_dataset_attr(cd, "long_name", f"{path}/counts", issues, severity=WARNING) + + # raw_time — accept legacy 'time_of_flight' + rt_key = "raw_time" + if not _has_dataset(det, "raw_time"): + if _has_dataset(det, "time_of_flight"): + rt_key = "time_of_flight" + issues.append(Issue(WARNING, path, + "Dataset 'raw_time' not found; found legacy 'time_of_flight' " + "(rename to 'raw_time' for spec compliance)")) + else: + issues.append(Issue(ERROR, path, "Required dataset 'raw_time' is missing")) + rt_key = None + if rt_key: + _check_dataset_attr(det[rt_key], "units", f"{path}/{rt_key}", issues) + + _check_required_dataset(det, "spectrum_index", path, issues) + + for name in ("corrected_time", "time_zero", "first_good_time", "last_good_time", + "grouping", "alpha", "dead_time", "period_index"): + if not _has_dataset(det, name): + issues.append(Issue(INFO, path, f"Optional dataset '{name}' not present")) + + +def _hc_validate_v2_nxdata(data, path: str, issues: list) -> None: + if _check_required_dataset(data, "counts", path, issues): + cd = data["counts"] + _check_dataset_attr(cd, "signal", f"{path}/counts", issues, expected_value="1") + _check_dataset_attr(cd, "axes", f"{path}/counts", issues) + + rt_key = "raw_time" + if not _has_dataset(data, "raw_time"): + if _has_dataset(data, "time_of_flight"): + rt_key = "time_of_flight" + issues.append(Issue(WARNING, path, + "Dataset 'raw_time' not found; found legacy 'time_of_flight' " + "(rename to 'raw_time' for spec compliance)")) + else: + issues.append(Issue(ERROR, path, "Required dataset 'raw_time' is missing")) + rt_key = None + if rt_key: + _check_dataset_attr(data[rt_key], "units", f"{path}/{rt_key}", issues) + + for name in ("corrected_time", "spectrum_index", "period_index", + "grouping", "alpha", "dead_time", "time_zero"): + if not _has_dataset(data, name): + issues.append(Issue(INFO, path, f"Optional dataset '{name}' not present")) + + +def _hc_validate_v2_user(user, path: str, issues: list) -> None: + _check_required_dataset(user, "name", path, issues) + for name in ("affiliation", "email", "facility_user_id"): + if not _has_dataset(user, name): + issues.append(Issue(INFO, path, f"Optional dataset '{name}' not present")) + + +# --------------------------------------------------------------------------- +# Hardcoded validation — Version 1 (fallback when no PDF is provided) +# --------------------------------------------------------------------------- + +def _hc_validate_v1_root(f, issues: list) -> None: + path = "/" + for attr in ("NeXus_version", "user"): + if attr not in f.attrs: + issues.append(Issue(WARNING, path, + f"V1: Expected root attribute '@{attr}' not found")) + if "run" in f: + _hc_validate_v1_entry(f["run"], "/run", issues) + else: + entries = _find_groups_by_class(f, "NXentry") + if not entries: + issues.append(Issue(ERROR, path, + "V1: No NXentry found (expected group named 'run')")) + else: + for name, entry in entries: + issues.append(Issue(WARNING, path, + f"V1: NXentry found as '{name}', expected 'run'")) + _hc_validate_v1_entry(entry, f"/{name}", issues) + + +def _hc_validate_v1_entry(entry, path: str, issues: list) -> None: + for name in ("IDF_version", "program_name", "number", "title", + "notes", "analysis", "lab", "beamline", + "start_time", "stop_time", "switching_states"): + _check_required_dataset(entry, name, path, issues) + _check_optional_dataset(entry, "duration", path, issues) + + user = _check_required_group(entry, "user", "NXuser", path, issues) + sample = _check_required_group(entry, "sample", "NXsample", path, issues) + instr = _check_required_group(entry, "instrument", "NXinstrument", path, issues) + hist = _check_required_group(entry, "histogram_data_1", "NXdata", path, issues) + + if user: + for n in ("name", "experiment_number"): + _check_required_dataset(user, n, f"{path}/user", issues) + + if sample: + _check_required_dataset(sample, "name", f"{path}/sample", issues) + for n in ("temperature", "magnetic_field"): + if _check_required_dataset(sample, n, f"{path}/sample", issues): + _check_dataset_attr(sample[n], "units", f"{path}/sample/{n}", issues) + + if instr: + _check_required_dataset(instr, "name", f"{path}/instrument", issues) + ip = f"{path}/instrument" + det = _check_required_group(instr, "detector", "NXdetector", ip, issues) + coll = _check_required_group(instr, "collimator", "NXcollimator", ip, issues) + beam = _check_required_group(instr, "beam", "NXbeam", ip, issues) + + if det: + _check_required_dataset(det, "number", f"{ip}/detector", issues) + for n in ("deadtimes", "angles"): + _check_optional_dataset(det, n, f"{ip}/detector", issues) + if _has_dataset(det, "deadtimes"): + for a in ("units", "available"): + _check_dataset_attr(det["deadtimes"], a, + f"{ip}/detector/deadtimes", issues, severity=WARNING) + if coll: + _check_required_dataset(coll, "type", f"{ip}/collimator", issues) + if beam: + for n in ("total_counts", "daereads", "frames"): + _check_optional_dataset(beam, n, f"{ip}/beam", issues) + + if hist: + hp = f"{path}/histogram_data_1" + if _check_required_dataset(hist, "counts", hp, issues): + cd = hist["counts"] + for a in ("units", "signal", "number", "length", + "t0_bin", "first_good_bin", "last_good_bin", "offset"): + _check_dataset_attr(cd, a, f"{hp}/counts", issues) + for n in ("resolution", "time_zero", "raw_time"): + _check_required_dataset(hist, n, hp, issues) + if _has_dataset(hist, "resolution"): + _check_dataset_attr(hist["resolution"], "units", f"{hp}/resolution", issues) + if _has_dataset(hist, "time_zero"): + for a in ("units", "available"): + _check_dataset_attr(hist["time_zero"], a, f"{hp}/time_zero", issues) + if _has_dataset(hist, "raw_time"): + for a in ("axis", "primary", "units"): + _check_dataset_attr(hist["raw_time"], a, f"{hp}/raw_time", issues) + for n in ("corrected_time", "grouping", "alpha"): + _check_optional_dataset(hist, n, hp, issues) + + +# --------------------------------------------------------------------------- +# Version detection +# --------------------------------------------------------------------------- + +def _detect_version(f) -> int: + """Return 1 or 2 based on file contents.""" + import h5py + for name, item in f.items(): + if not isinstance(item, h5py.Group): + continue + if _nx_class(item) != "NXentry": + continue + if "definition" in item: + defn = str(_scalar(item["definition"])).lower() + if "muon" in defn: + return 2 + if "IDF_version" in item: + try: + if int(_scalar(item["IDF_version"])) == 2: + return 2 + except (ValueError, TypeError): + pass + if "run" in f and isinstance(f["run"], h5py.Group): + return 1 + return 2 # modern default + + +# --------------------------------------------------------------------------- +# File format detection +# --------------------------------------------------------------------------- + +def _is_hdf5(path: str) -> bool: + try: + with open(path, "rb") as fh: + return fh.read(8) == b"\x89HDF\r\n\x1a\n" + except OSError: + return False + + +def _is_hdf4(path: str) -> bool: + try: + with open(path, "rb") as fh: + return fh.read(4) == b"\x0e\x03\x13\x01" + except OSError: + return False + + +def _open_hdf4(path: str): + try: + from pyhdf.SD import SD, SDC + except ImportError: + raise ImportError( + "pyhdf is required to read HDF4 files. " + "Install with: pip install pyhdf" + ) + from pyhdf.SD import SD, SDC + return SD(path, SDC.READ) + + +def _hc_validate_v1_hdf4(path: str, issues: list, show_info: bool) -> None: + """ + Validate a V1 HDF4 NeXus file using pyhdf. + + In HDF4/NeXus V1 all datasets live in a flat SD namespace. + The NXclass hierarchy is encoded in Vgroups but the actual data + is accessed by dataset name via the SD interface. + """ + from pyhdf.SD import SD, SDC + + sd = SD(path, SDC.READ) + ndatasets, nglobalattrs = sd.info() + + # Build index: ds_name → list of attr-dicts (one per occurrence) + ds_index: dict[str, list[dict]] = {} + ds_shapes: dict[str, list] = {} # ds_name → list of shapes + for i in range(ndatasets): + sds = sd.select(i) + ds_name, rank, dims, dtype, nattrs = sds.info() + attrs: dict[str, Any] = {} + for j in range(nattrs): + a = sds.attr(j) + aname = a.info()[0] + attrs[aname] = a.get() + sds.endaccess() + ds_index.setdefault(ds_name, []).append(attrs) + ds_shapes.setdefault(ds_name, []).append(dims) + + global_attrs: dict[str, Any] = {} + for j in range(nglobalattrs): + a = sd.attr(j) + aname = a.info()[0] + global_attrs[aname] = a.get() + + sd.end() + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + def _has(name: str) -> bool: + return name in ds_index + + def _req(ds_name: str, path_label: str, req_attrs: list[str] = ()) -> bool: + """Check dataset exists; optionally check each required attribute.""" + if not _has(ds_name): + issues.append(Issue(ERROR, path_label, + f"Required dataset '{ds_name}' is missing")) + return False + # At least one occurrence must carry all required attrs + for attr in req_attrs: + if not any(attr in d for d in ds_index[ds_name]): + issues.append(Issue(ERROR, f"{path_label}/{ds_name}", + f"Required attribute '@{attr}' is missing")) + return True + + def _opt(ds_name: str, path_label: str, req_attrs: list[str] = ()) -> None: + """Optionally present dataset; if present check attributes.""" + if not _has(ds_name): + if show_info: + issues.append(Issue(INFO, path_label, + f"Optional dataset '{ds_name}' not present")) + return + for attr in req_attrs: + if not any(attr in d for d in ds_index[ds_name]): + issues.append(Issue(WARNING, f"{path_label}/{ds_name}", + f"Optional dataset '{ds_name}' is missing " + f"recommended attribute '@{attr}'")) + + # ------------------------------------------------------------------ + # Root-level attributes + # ------------------------------------------------------------------ + for attr in ("NeXus_version",): + if attr not in global_attrs: + issues.append(Issue(WARNING, "/", + f"Root attribute '@{attr}' not present")) + elif show_info: + issues.append(Issue(INFO, "/", + f"@{attr} = {global_attrs[attr]!r}")) + + # ------------------------------------------------------------------ + # NXentry (run) + # ------------------------------------------------------------------ + entry = "/run" + for field in ("IDF_version", "program_name", "number", "title", + "notes", "analysis", "lab", "beamline", + "start_time", "stop_time", "switching_states"): + _req(field, entry) + + # ------------------------------------------------------------------ + # NXuser + # ------------------------------------------------------------------ + user = f"{entry}/user" + _req("experiment_number", user) + # 'name' is shared by many NX classes — just verify at least one exists + if not _has("name"): + issues.append(Issue(ERROR, user, "Required dataset 'name' is missing")) + + # ------------------------------------------------------------------ + # NXsample + # ------------------------------------------------------------------ + sample = f"{entry}/sample" + _req("temperature", sample, ["units"]) + _req("magnetic_field", sample, ["units"]) + + # ------------------------------------------------------------------ + # NXinstrument / NXdetector + # ------------------------------------------------------------------ + detector = f"{entry}/instrument/detector" + # 'number' used both for run number and detector count; just check existence + if not _has("number"): + issues.append(Issue(ERROR, detector, + "Required dataset 'number' (detector count) is missing")) + _opt("deadtimes", detector, ["units", "available"]) + _opt("angles", detector, ["coordinate_system", "available"]) + + # ------------------------------------------------------------------ + # NXinstrument / NXcollimator + # ------------------------------------------------------------------ + _req("type", f"{entry}/instrument/collimator") + + # ------------------------------------------------------------------ + # NXinstrument / NXbeam + # ------------------------------------------------------------------ + _req("total_counts", f"{entry}/instrument/beam", ["units"]) + + # ------------------------------------------------------------------ + # NXdata (histogram_data_1) + # ------------------------------------------------------------------ + hdata = f"{entry}/histogram_data_1" + if _req("counts", hdata, + ["units", "signal", "t0_bin", "first_good_bin", "last_good_bin"]): + # Dimensional consistency: counts shape vs raw_time shape + counts_shapes = ds_shapes.get("counts", []) + if counts_shapes and _has("raw_time"): + ntc = counts_shapes[0][-1] if isinstance(counts_shapes[0], list) else counts_shapes[0] + rt_shapes = ds_shapes.get("raw_time", []) + if rt_shapes: + rt_len = rt_shapes[0] if isinstance(rt_shapes[0], int) else rt_shapes[0][0] + if rt_len not in (ntc, ntc + 1): + issues.append(Issue(WARNING, f"{hdata}/raw_time", + f"Shape mismatch: raw_time length {rt_len} " + f"expected {ntc} or {ntc+1} (from counts)")) + + _req("resolution", hdata, ["units"]) + _req("time_zero", hdata, ["units", "available"]) + _req("raw_time", hdata, ["axis", "primary", "units"]) + + +# --------------------------------------------------------------------------- +# Main validation entry point +# --------------------------------------------------------------------------- + +def validate_file(file_path: str, + pdf_schema: Optional[dict] = None, + show_info: bool = False) -> list[Issue]: + """ + Validate a single NeXus file. + + Parameters + ---------- + file_path : path to the .nxs file + pdf_schema : optional schema dict returned by parse_pdf_schema(); + if None, hardcoded validation is used + show_info : if True, include INFO-level findings in the result + + Returns a list of Issue objects (always includes ERRORs and WARNINGs; + INFOs only when show_info=True). + """ + issues: list[Issue] = [] + + if not Path(file_path).exists(): + issues.append(Issue(ERROR, str(file_path), "File not found")) + return issues + + if _is_hdf5(file_path): + issues.append(Issue(INFO, "/", "File format: HDF5")) + try: + import h5py + with h5py.File(file_path, "r") as f: + version = _detect_version(f) + issues.append(Issue(INFO, "/", + f"Detected instrument definition version: {version}")) + + if pdf_schema is not None: + # ---- PDF-driven structural validation ---- + issues.append(Issue(INFO, "/", + "Using PDF-driven validation")) + validate_with_schema(f, pdf_schema, version, issues, show_info) + else: + # ---- Hardcoded structural validation ---- + if version == 2: + _hc_validate_v2_root(f, issues) + for name, item in f.items(): + if isinstance(item, h5py.Group) and _nx_class(item) == "NXentry": + _hc_validate_v2_entry(item, f"/{name}", issues) + else: + _hc_validate_v1_root(f, issues) + + # ---- Dimensional consistency (always) ---- + for name, item in f.items(): + if isinstance(item, h5py.Group) and _nx_class(item) == "NXentry": + check_dimensional_consistency(item, f"/{name}", issues) + + except Exception as exc: + issues.append(Issue(ERROR, str(file_path), + f"Failed to read HDF5 file: {exc}")) + + elif _is_hdf4(file_path): + issues.append(Issue(INFO, "/", "File format: HDF4 (Version 1)")) + try: + _open_hdf4(file_path).end() # quick open/close to confirm readability + issues.append(Issue(INFO, "/", "HDF4 file opened successfully")) + _hc_validate_v1_hdf4(file_path, issues, show_info) + except ImportError as exc: + issues.append(Issue(ERROR, "/", str(exc))) + issues.append(Issue(WARNING, "/", + "Full HDF4 validation requires 'pyhdf'. " + "Install with: pip install pyhdf")) + except Exception as exc: + issues.append(Issue(ERROR, "/", f"Failed to open HDF4 file: {exc}")) + else: + issues.append(Issue(ERROR, "/", "Unknown file format (not HDF4 or HDF5)")) + + if not show_info: + issues = [i for i in issues if i.severity != INFO] + + return issues + + +# --------------------------------------------------------------------------- +# Output / reporting +# --------------------------------------------------------------------------- + +def _print_report(file_path: str, issues: list, show_info: bool) -> int: + errors = [i for i in issues if i.severity == ERROR] + warnings = [i for i in issues if i.severity == WARNING] + infos = [i for i in issues if i.severity == INFO] + + bar = "=" * 72 + print(f"\n{bar}") + print(f" File: {file_path}") + print(bar) + + sort_key = lambda i: (0 if i.severity == ERROR else + 1 if i.severity == WARNING else 2, + i.path) + for issue in sorted(issues, key=sort_key): + print(f" {issue}") + + info_str = f", {len(infos)} info(s)" if show_info else "" + print(bar) + print(f" Summary: {len(errors)} error(s), {len(warnings)} warning(s){info_str}") + print(bar) + return 1 if errors else 0 + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser( + description=( + "Validate muon NeXus HDF4/5 files against the ISIS Muon Instrument " + "Definitions (Version 1 and Version 2 / 'muonTD')." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "files", nargs="*", metavar="FILE", + help="One or more NeXus files (.nxs) to validate", + ) + parser.add_argument( + "--pdf", metavar="DEF.PDF", + help=( + "Path to a nexus_instrument_definitions_*.pdf. " + "When given, the PDF is parsed and used as the validation schema " + "instead of the built-in hardcoded rules. " + "Requires: pip install pdfplumber" + ), + ) + parser.add_argument( + "--list-schema", action="store_true", + help=( + "Parse --pdf and print a summary of the extracted schema, " + "then exit (no files need to be specified)." + ), + ) + parser.add_argument( + "-v", "--verbose", action="store_true", + help="Also show INFO-level findings (optional fields, format details).", + ) + parser.add_argument( + "--errors-only", action="store_true", + help="Show only ERROR-level issues; suppress warnings and info.", + ) + args = parser.parse_args() + + # ------------------------------------------------------------------ + # Load PDF schema if requested + # ------------------------------------------------------------------ + pdf_schema = None + if args.pdf: + pdf_path = Path(args.pdf) + if not pdf_path.exists(): + print(f"ERROR: PDF file not found: {args.pdf}", file=sys.stderr) + sys.exit(2) + try: + print(f"Parsing PDF schema from: {pdf_path.name} …") + pdf_schema = parse_pdf_schema(str(pdf_path)) + n_classes = sum(len(v) for v in pdf_schema.values()) + print(f" → {len(pdf_schema)} NX classes found " + f"({n_classes} version entries)") + except ImportError as exc: + print(f"ERROR: {exc}", file=sys.stderr) + sys.exit(2) + except Exception as exc: + print(f"ERROR: Failed to parse PDF: {exc}", file=sys.stderr) + sys.exit(2) + + if args.list_schema: + print("\nExtracted schema:\n") + print(schema_summary(pdf_schema)) + sys.exit(0) + + if not args.files: + if args.list_schema and not args.pdf: + parser.error("--list-schema requires --pdf") + parser.error("at least one FILE argument is required") + + # ------------------------------------------------------------------ + # Validate each file + # ------------------------------------------------------------------ + show_info = args.verbose and not args.errors_only + overall_rc = 0 + + for file_path in args.files: + issues = validate_file(file_path, + pdf_schema=pdf_schema, + show_info=show_info) + if args.errors_only: + issues = [i for i in issues if i.severity == ERROR] + rc = _print_report(file_path, issues, show_info) + overall_rc = max(overall_rc, rc) + + sys.exit(overall_rc) + + +if __name__ == "__main__": + main()