diff --git a/csaxs_bec/bec_ipython_client/plugins/cSAXS/filter_transmission.py b/csaxs_bec/bec_ipython_client/plugins/cSAXS/filter_transmission.py new file mode 100644 index 0000000..fafb010 --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/cSAXS/filter_transmission.py @@ -0,0 +1,500 @@ + +""" +cSAXS exposure-box filter transmission utilities. + +Implements fil_trans physics: +- Per-material attenuation-length tables loaded from package 'filter_data/'. +- Linear interpolation of attenuation length vs. energy (eV). +- Transmission T = exp(-t / lambda), t in micrometers. +- Enumeration of all enabled combinations across 4 units x 6 positions. +- Selection of the combination with transmission closest to a target. + +Motion is executed using provided position tables for each filter_array_*_x stage. +""" + +from __future__ import annotations + +import math +from typing import Dict, List, Optional, Tuple + +from importlib import resources +# Resolve the filter_data/ folder via importlib.resources +import csaxs_bec.bec_ipython_client.plugins.cSAXS.filter_transmission as ft_pkg + +from bec_lib import bec_logger + + +class cSAXSFilterTransmission: + """ + Mixin providing the fil_trans command. + Assumes self.client and self.OMNYTools exist. + + Example: + csaxs.fil_trans(0.10, energy_kev=6.2) # dry-run first, then asks to execute + """ + + # ----------------------------- + # Material naming & file mapping + # ----------------------------- + _MATERIAL_FILES: Dict[str, Optional[str]] = { + "none": None, + "diam": "filter_attenuation-length_diamond.txt", + "al": "filter_attenuation-length_al.txt", + "si": "filter_attenuation-length_si.txt", + "ti": "filter_attenuation-length_ti.txt", + "cu": "filter_attenuation-length_cu.txt", + "ge": "filter_attenuation-length_ge.txt", + "zr": "filter_attenuation-length_zr.txt", + # optional; provide file if you enable Fe5 + "fe": "filter_attenuation-length_fe.txt", + } + + # ----------------------------------------- + # Exposure-box filter configuration (4 x 6) + # + # Current hardware (per your message): + # Unit 1 (filter_array_1_x): out, Si400, Ge300, Ti800, Zr20 + # Unit 2 (filter_array_2_x): out, Si200, Si3200, Ti400, Cu20 + # Unit 3 (filter_array_3_x): out, Si100, Si1600, Ti200, Ti3200, Fe5 + # Unit 4 (filter_array_4_x): out, Si50, Si800, Ti100, Ti1600, Ti20 + # + # Positions 1..6 = [out, m1, m2, m3, m4, m5] + # Each entry: ((mat1, th1_um), (mat2, th2_um), enabled_bool) + # ----------------------------------------- + _FILTERS: List[Tuple[Tuple[str, float], Tuple[str, float], bool]] = [ + # Unit 1 + (("none", 0.0), ("none", 0.0), True), # out + (("si", 400.0), ("none", 0.0), True), # Si400 + (("ge", 300.0), ("none", 0.0), True), # Ge300 + (("ti", 800.0), ("none", 0.0), True), # Ti800 + (("zr", 20.0), ("none", 0.0), True), # Zr20 + (("none", 0.0), ("none", 0.0), False), # unused + + # Unit 2 + (("none", 0.0), ("none", 0.0), True), # out + (("si", 200.0), ("none", 0.0), True), # Si200 + (("si", 3200.0), ("none", 0.0), True), # Si3200 + (("ti", 400.0), ("none", 0.0), True), # Ti400 + (("cu", 20.0), ("none", 0.0), True), # Cu20 + (("none", 0.0), ("none", 0.0), False), # unused + + # Unit 3 + (("none", 0.0), ("none", 0.0), True), # out + (("si", 100.0), ("none", 0.0), True), # Si100 + (("si", 1600.0), ("none", 0.0), True), # Si1600 + (("ti", 200.0), ("none", 0.0), True), # Ti200 + (("ti", 3200.0), ("none", 0.0), True), # Ti3200 + (("fe", 5.0), ("none", 0.0), False), # Fe5 (disabled unless data file provided) + + # Unit 4 + (("none", 0.0), ("none", 0.0), True), # out + (("si", 50.0), ("none", 0.0), True), # Si50 + (("si", 800.0), ("none", 0.0), True), # Si800 + (("ti", 100.0), ("none", 0.0), True), # Ti100 + (("ti", 1600.0), ("none", 0.0), True), # Ti1600 + (("ti", 20.0), ("none", 0.0), True), # Ti20 + ] + + _UNITS = 4 + _PER_UNIT = 6 + + # ----------------------------------------- + # Motion mapping: user-scale coordinates + # [out, m1, m2, m3, m4, m5] + # ----------------------------------------- + _POSITIONS_USER: List[List[Optional[float]]] = [ + # Unit 1 (filter_array_1_x) + [25.0, 17.9, 7.9, -2.3, -12.1, None], + # Unit 2 (filter_array_2_x) + [25.5, 17.6, 7.8, -2.3, -12.3, None], + # Unit 3 (filter_array_3_x) + [25.8, 17.6, 7.8, -2.2, -12.3, -22.3], # Fe5 at -22.3 + # Unit 4 (filter_array_4_x) + [25.0, 17.5, 7.5, -2.2, -12.4, -22.2], + ] + + # Device axis names (adjust if different) + _AXES: List[str] = [ + "filter_array_1_x", + "filter_array_2_x", + "filter_array_3_x", + "filter_array_4_x", + ] + + # ----------------------------- + # Construction / Internals + # ----------------------------- + def __init__(self, **kwargs): + super().__init__(**kwargs) + # In multiple-inheritance setups our __init__ might be skipped; guard lazily too. + self._attlen_cache: Dict[str, Tuple[List[float], List[float]]] = {} + + def _ensure_internal_state(self): + """Lazy guard for robustness if __init__ wasn’t called.""" + if not hasattr(self, "_attlen_cache"): + self._attlen_cache = {} + + # ----------------------------- + # Public API + # ----------------------------- + def fil_trans( + self, + transmission: float, + energy_kev: Optional[float] = None, + print_only: bool = True, + ) -> Optional[None]: + """ + Set exposure-box filters to achieve a target transmission. + + Physics: + - Load per-material attenuation-length tables (energy in eV) + - Linear interpolation to get λ(energy) in µm + - Per-position transmission: product of layer transmissions + - Combination transmission: product across 4 units + - Choose combination whose transmission is closest to target + """ + self._ensure_internal_state() + + # --- Validation --- + try: + transmission = float(transmission) + except Exception: + raise ValueError("Transmission must be numeric.") + + if not (0.0 < transmission <= 1.0): + raise ValueError("Transmission must be between 0 and 1.") + + # --- Energy handling --- + if energy_kev is None: + try: + energy_kev = float(self.client.device_manager.mokev.read()) + except Exception as exc: + raise RuntimeError( + "Energy not specified and could not read current beam energy." + ) from exc + else: + energy_kev = float(energy_kev) + + # --- Summary header --- + print("\nExposure-box filter transmission request") + print("-" * 60) + print(f"Target transmission : {transmission:.6e}") + print(f"Photon energy : {energy_kev:.3f} keV") + print(f"Mode : {'PRINT ONLY' if print_only else 'EXECUTE'}") + print("-" * 60) + + # --- Compute best combination --- + best = self._find_best_combination(transmission, energy_kev) + + # --- Report (selected combination with per-unit detail) --- + self._print_combination(best, energy_kev, header="Selected combination") + + # Nearby: print only code + transmission (no per-unit details) + neighbors = self._neighbors_around_best(transmission, energy_kev, span=5) + if neighbors: + print("\nNearby combinations (by transmission proximity):") + for row in neighbors: + print( + f"{row['transmission']:9.3e}" + ) + + # --- Dry run prompt: ask to execute now (default 'y') --- + if print_only: + print("\n[DRY RUN] No motion executed yet.") + if self.OMNYTools.yesno( + "Execute motion to the selected filter combination now?", + "y", # default YES + ): + self._execute_combination(best, energy_kev) + else: + print("Execution skipped.") + return None + + + # --- Execute motion directly (if print_only=False) --- + self._execute_combination(best, energy_kev) + return None + + # ----------------------------- + # Physics helpers + # ----------------------------- + def _load_attlen(self, material: str) -> Tuple[List[float], List[float]]: + """ + Load and cache attenuation-length table for a material from package resources. + + Returns: + energies_eV (list), attlen_um (list) + """ + self._ensure_internal_state() + + material = material.lower() + if material in self._attlen_cache: + return self._attlen_cache[material] + + fname = self._MATERIAL_FILES.get(material) + if not fname: + # 'none' or unsupported mapped to empty + self._attlen_cache[material] = ([], []) + return ([], []) + + energies_eV: List[float] = [] + attlen_um: List[float] = [] + + # Load from installed package: /filter_data/ + res = resources.files(ft_pkg) / "filter_data" / fname + try: + # as_file yields a concrete filesystem path even if package is zipped + with resources.as_file(res) as p: + with p.open("r") as f: + for line in f: + line = line.strip() + if not line: + continue + parts = line.split() + if len(parts) < 2: + continue + energies_eV.append(float(parts[0])) + attlen_um.append(float(parts[1])) + except FileNotFoundError as e: + raise FileNotFoundError( + f"Attenuation data file not found for '{material}': {fname} " + f"(looked in package filter_data)" + ) from e + + if not energies_eV: + raise ValueError( + f"Attenuation data for material '{material}' is empty or invalid: {fname}" + ) + + self._attlen_cache[material] = (energies_eV, attlen_um) + return energies_eV, attlen_um + + def _attenuation_length_um(self, energy_kev: float, material: str) -> float: + """ + Linear interpolation of attenuation length λ(energy) in µm. + Input energy in keV; tables are in eV. + """ + material = material.lower() + if material == "none": + return float("inf") # No attenuation + + energies_eV, attlen_um = self._load_attlen(material) + if not energies_eV: + # unsupported mapped above + raise ValueError(f"Unsupported material or missing data: '{material}'") + + e_ev = energy_kev * 1000.0 + + # Clip to the nearest edge for practicality + if e_ev <= energies_eV[0]: + bec_logger.logger.warning( + f"[cSAXS] energy {energy_kev:.3f} keV below table range for {material}; " + f"clipping to {energies_eV[0]/1000.0:.3f} keV." + ) + return attlen_um[0] + if e_ev >= energies_eV[-1]: + bec_logger.logger.warning( + f"[cSAXS] energy {energy_kev:.3f} keV above table range for {material}; " + f"clipping to {energies_eV[-1]/1000.0:.3f} keV." + ) + return attlen_um[-1] + + # Binary search for interval + lo, hi = 0, len(energies_eV) - 1 + while hi - lo > 1: + mid = (lo + hi) // 2 + if energies_eV[mid] >= e_ev: + hi = mid + else: + lo = mid + + e_lo, e_hi = energies_eV[lo], energies_eV[hi] + lam_lo, lam_hi = attlen_um[lo], attlen_um[hi] + lam = (e_ev - e_lo) / (e_hi - e_lo) * (lam_hi - lam_lo) + lam_lo + return lam + + def _position_transmission( + self, + pos_entry: Tuple[Tuple[str, float], Tuple[str, float], bool], + energy_kev: float, + ) -> Optional[float]: + """Transmission for a single position (possibly two layers).""" + (mat1, th1), (mat2, th2), enabled = pos_entry + if not enabled: + return None + T1 = 1.0 + T2 = 1.0 + if mat1 != "none" and th1 > 0.0: + lam1 = self._attenuation_length_um(energy_kev, mat1) + T1 = math.exp(-th1 / lam1) + if mat2 != "none" and th2 > 0.0: + lam2 = self._attenuation_length_um(energy_kev, mat2) + T2 = math.exp(-th2 / lam2) + return T1 * T2 + + def _all_combinations(self, energy_kev: float) -> List[dict]: + """ + Enumerate all enabled combinations across 4 units. + Returns a list of dicts sorted by transmission ascending: + { + 'code': 'abcd' # positions 1..6 per unit + 'indices': [i0, i1, i2, i3], # 0..5 + 'materials': [ ((m1,t1), (m2,t2)|None ), ... for 4 units ] + 'transmission': float + } + """ + # Slice filters per unit + units = [ + self._FILTERS[u * self._PER_UNIT : (u + 1) * self._PER_UNIT] + for u in range(self._UNITS) + ] + + # Precompute per-position transmissions + per_pos_T: List[List[Optional[float]]] = [ + [self._position_transmission(pos_entry, energy_kev) for pos_entry in unit] + for unit in units + ] + + combos: List[dict] = [] + for i0 in range(self._PER_UNIT): + T0 = per_pos_T[0][i0] + if T0 is None: + continue + for i1 in range(self._PER_UNIT): + T1 = per_pos_T[1][i1] + if T1 is None: + continue + for i2 in range(self._PER_UNIT): + T2 = per_pos_T[2][i2] + if T2 is None: + continue + for i3 in range(self._PER_UNIT): + T3 = per_pos_T[3][i3] + if T3 is None: + continue + + T = T0 * T1 * T2 * T3 + indices = [i0, i1, i2, i3] + code = "".join(str(i + 1) for i in indices) + + # Collect materials/thickness for reporting + mats = [] + for u, idx in enumerate(indices): + (m1, t1), (m2, t2), _ = units[u][idx] + mats.append(((m1, t1), (m2, t2) if (m2 != "none" and t2 > 0.0) else None)) + + combos.append( + { + "code": code, + "indices": indices, + "materials": mats, + "transmission": T, + } + ) + + combos.sort(key=lambda c: c["transmission"]) # ascending + return combos + + def _find_best_combination(self, target_T: float, energy_kev: float) -> dict: + """Pick combination with transmission closest to target.""" + combos = self._all_combinations(energy_kev) + best = min(combos, key=lambda c: abs(c["transmission"] - target_T)) + return best + + def _neighbors_around_best( + self, target_T: float, energy_kev: float, span: int = 5 + ) -> List[dict]: + """Return a few combinations around the best, by proximity in transmission.""" + combos = self._all_combinations(energy_kev) + ranked = sorted(combos, key=lambda c: abs(c["transmission"] - target_T)) + return ranked[1 : 1 + span] # exclude best itself + + # ----------------------------- + # Formatting & motion + # ----------------------------- + def _print_combination(self, comb: dict, energy_kev: float, header: Optional[str] = None): + if header: + print(f"\n{header}:") + code = comb["code"] + T = comb["transmission"] + print(f" Filters {code} transmission at {energy_kev:.3f} keV = {T:9.3e}") + # Per-unit detail for the selected combination only + for u, matinfo in enumerate(comb["materials"], start=1): + first, second = matinfo + (m1, t1) = first + pos = int(code[u - 1]) + if second is not None: + (m2, t2) = second + print( + f" unit {u}: #{pos} " + f"{t1:4.0f} mu {m1:<4} + {t2:4.0f} mu {m2:<4}" + ) + else: + if m1 != "none" and t1 > 0.0: + print(f" unit {u}: #{pos} {t1:4.0f} mu {m1:<4}") + else: + print(f" unit {u}: #{pos} ----- out") + + def _execute_combination(self, comb: dict, energy_kev: float): + """ + Execute motion to the indices encoded in 'comb["code"]'. + + Mapping: + - code 'abcd' → per-unit index (a,b,c,d) ∈ {1..6} + - positions are looked up from _POSITIONS_USER + - axes are defined in _AXES + + Motion uses: umv(dev.filter_array_?_x, ) + If 'umv' is not present, falls back to axis.move(). + """ + indices = comb["indices"] # 0-based per unit + targets = [] + for unit_idx, pos_idx in enumerate(indices): + pos_list = self._POSITIONS_USER[unit_idx] + target_pos = pos_list[pos_idx] + if target_pos is None: + raise RuntimeError( + f"Unit {unit_idx+1} position {pos_idx+1} has no defined coordinate." + ) + targets.append(target_pos) + + # Perform motion via 'umv(dev.axis, position)' or fallback to .move() + print("\nExecuting motion to selected combination:") + dev = getattr(self.client, "dev", None) + if dev is None: + # fallback to device_manager if dev is not available + dev = self.client.device_manager + + for unit_idx, target in enumerate(targets): + axis_name = self._AXES[unit_idx] + axis_obj = getattr(dev, axis_name, None) + if axis_obj is None: + raise RuntimeError(f"Device axis '{axis_name}' not found (dev).") + print(f" {axis_name} → {target:.3f}") + + # Try the umv command first + if hasattr(self.client, "umv"): + self.client.umv(axis_obj, target) + else: + # Fallback to direct move + if hasattr(axis_obj, "move"): + axis_obj.move(target) + else: + raise RuntimeError( + f"Axis '{axis_name}' has no 'move' method and client.umv is unavailable." + ) + + # Verify final positions + print("\nVerifying final positions:") + for unit_idx in range(self._UNITS): + axis_name = self._AXES[unit_idx] + axis_obj = getattr(dev, axis_name) + if hasattr(axis_obj, "read"): + actual = float(axis_obj.read()) + print(f" {axis_name} = {actual:.3f}") + else: + print(f" {axis_name}: readback unavailable") + + # Optionally: achieved transmission (approx., using the selected combination value) + achieved_T = comb["transmission"] + print(f"\nAchieved transmission (approx.): {achieved_T:9.3e} at {energy_kev:.3f} keV")