initial version of fil_trans

This commit is contained in:
x01dc
2026-01-06 12:51:22 +01:00
committed by x12sa
parent adedf0fa4f
commit e7e49b4916

View File

@@ -0,0 +1,500 @@
"""
cSAXS exposure-box filter transmission utilities.
Implements fil_trans physics:
- Per-material attenuation-length tables loaded from package 'filter_data/'.
- Linear interpolation of attenuation length vs. energy (eV).
- Transmission T = exp(-t / lambda), t in micrometers.
- Enumeration of all enabled combinations across 4 units x 6 positions.
- Selection of the combination with transmission closest to a target.
Motion is executed using provided position tables for each filter_array_*_x stage.
"""
from __future__ import annotations
import math
from typing import Dict, List, Optional, Tuple
from importlib import resources
# Resolve the filter_data/ folder via importlib.resources
import csaxs_bec.bec_ipython_client.plugins.cSAXS.filter_transmission as ft_pkg
from bec_lib import bec_logger
class cSAXSFilterTransmission:
"""
Mixin providing the fil_trans command.
Assumes self.client and self.OMNYTools exist.
Example:
csaxs.fil_trans(0.10, energy_kev=6.2) # dry-run first, then asks to execute
"""
# -----------------------------
# Material naming & file mapping
# -----------------------------
_MATERIAL_FILES: Dict[str, Optional[str]] = {
"none": None,
"diam": "filter_attenuation-length_diamond.txt",
"al": "filter_attenuation-length_al.txt",
"si": "filter_attenuation-length_si.txt",
"ti": "filter_attenuation-length_ti.txt",
"cu": "filter_attenuation-length_cu.txt",
"ge": "filter_attenuation-length_ge.txt",
"zr": "filter_attenuation-length_zr.txt",
# optional; provide file if you enable Fe5
"fe": "filter_attenuation-length_fe.txt",
}
# -----------------------------------------
# Exposure-box filter configuration (4 x 6)
#
# Current hardware (per your message):
# Unit 1 (filter_array_1_x): out, Si400, Ge300, Ti800, Zr20
# Unit 2 (filter_array_2_x): out, Si200, Si3200, Ti400, Cu20
# Unit 3 (filter_array_3_x): out, Si100, Si1600, Ti200, Ti3200, Fe5
# Unit 4 (filter_array_4_x): out, Si50, Si800, Ti100, Ti1600, Ti20
#
# Positions 1..6 = [out, m1, m2, m3, m4, m5]
# Each entry: ((mat1, th1_um), (mat2, th2_um), enabled_bool)
# -----------------------------------------
_FILTERS: List[Tuple[Tuple[str, float], Tuple[str, float], bool]] = [
# Unit 1
(("none", 0.0), ("none", 0.0), True), # out
(("si", 400.0), ("none", 0.0), True), # Si400
(("ge", 300.0), ("none", 0.0), True), # Ge300
(("ti", 800.0), ("none", 0.0), True), # Ti800
(("zr", 20.0), ("none", 0.0), True), # Zr20
(("none", 0.0), ("none", 0.0), False), # unused
# Unit 2
(("none", 0.0), ("none", 0.0), True), # out
(("si", 200.0), ("none", 0.0), True), # Si200
(("si", 3200.0), ("none", 0.0), True), # Si3200
(("ti", 400.0), ("none", 0.0), True), # Ti400
(("cu", 20.0), ("none", 0.0), True), # Cu20
(("none", 0.0), ("none", 0.0), False), # unused
# Unit 3
(("none", 0.0), ("none", 0.0), True), # out
(("si", 100.0), ("none", 0.0), True), # Si100
(("si", 1600.0), ("none", 0.0), True), # Si1600
(("ti", 200.0), ("none", 0.0), True), # Ti200
(("ti", 3200.0), ("none", 0.0), True), # Ti3200
(("fe", 5.0), ("none", 0.0), False), # Fe5 (disabled unless data file provided)
# Unit 4
(("none", 0.0), ("none", 0.0), True), # out
(("si", 50.0), ("none", 0.0), True), # Si50
(("si", 800.0), ("none", 0.0), True), # Si800
(("ti", 100.0), ("none", 0.0), True), # Ti100
(("ti", 1600.0), ("none", 0.0), True), # Ti1600
(("ti", 20.0), ("none", 0.0), True), # Ti20
]
_UNITS = 4
_PER_UNIT = 6
# -----------------------------------------
# Motion mapping: user-scale coordinates
# [out, m1, m2, m3, m4, m5]
# -----------------------------------------
_POSITIONS_USER: List[List[Optional[float]]] = [
# Unit 1 (filter_array_1_x)
[25.0, 17.9, 7.9, -2.3, -12.1, None],
# Unit 2 (filter_array_2_x)
[25.5, 17.6, 7.8, -2.3, -12.3, None],
# Unit 3 (filter_array_3_x)
[25.8, 17.6, 7.8, -2.2, -12.3, -22.3], # Fe5 at -22.3
# Unit 4 (filter_array_4_x)
[25.0, 17.5, 7.5, -2.2, -12.4, -22.2],
]
# Device axis names (adjust if different)
_AXES: List[str] = [
"filter_array_1_x",
"filter_array_2_x",
"filter_array_3_x",
"filter_array_4_x",
]
# -----------------------------
# Construction / Internals
# -----------------------------
def __init__(self, **kwargs):
super().__init__(**kwargs)
# In multiple-inheritance setups our __init__ might be skipped; guard lazily too.
self._attlen_cache: Dict[str, Tuple[List[float], List[float]]] = {}
def _ensure_internal_state(self):
"""Lazy guard for robustness if __init__ wasnt called."""
if not hasattr(self, "_attlen_cache"):
self._attlen_cache = {}
# -----------------------------
# Public API
# -----------------------------
def fil_trans(
self,
transmission: float,
energy_kev: Optional[float] = None,
print_only: bool = True,
) -> Optional[None]:
"""
Set exposure-box filters to achieve a target transmission.
Physics:
- Load per-material attenuation-length tables (energy in eV)
- Linear interpolation to get λ(energy) in µm
- Per-position transmission: product of layer transmissions
- Combination transmission: product across 4 units
- Choose combination whose transmission is closest to target
"""
self._ensure_internal_state()
# --- Validation ---
try:
transmission = float(transmission)
except Exception:
raise ValueError("Transmission must be numeric.")
if not (0.0 < transmission <= 1.0):
raise ValueError("Transmission must be between 0 and 1.")
# --- Energy handling ---
if energy_kev is None:
try:
energy_kev = float(self.client.device_manager.mokev.read())
except Exception as exc:
raise RuntimeError(
"Energy not specified and could not read current beam energy."
) from exc
else:
energy_kev = float(energy_kev)
# --- Summary header ---
print("\nExposure-box filter transmission request")
print("-" * 60)
print(f"Target transmission : {transmission:.6e}")
print(f"Photon energy : {energy_kev:.3f} keV")
print(f"Mode : {'PRINT ONLY' if print_only else 'EXECUTE'}")
print("-" * 60)
# --- Compute best combination ---
best = self._find_best_combination(transmission, energy_kev)
# --- Report (selected combination with per-unit detail) ---
self._print_combination(best, energy_kev, header="Selected combination")
# Nearby: print only code + transmission (no per-unit details)
neighbors = self._neighbors_around_best(transmission, energy_kev, span=5)
if neighbors:
print("\nNearby combinations (by transmission proximity):")
for row in neighbors:
print(
f"{row['transmission']:9.3e}"
)
# --- Dry run prompt: ask to execute now (default 'y') ---
if print_only:
print("\n[DRY RUN] No motion executed yet.")
if self.OMNYTools.yesno(
"Execute motion to the selected filter combination now?",
"y", # default YES
):
self._execute_combination(best, energy_kev)
else:
print("Execution skipped.")
return None
# --- Execute motion directly (if print_only=False) ---
self._execute_combination(best, energy_kev)
return None
# -----------------------------
# Physics helpers
# -----------------------------
def _load_attlen(self, material: str) -> Tuple[List[float], List[float]]:
"""
Load and cache attenuation-length table for a material from package resources.
Returns:
energies_eV (list), attlen_um (list)
"""
self._ensure_internal_state()
material = material.lower()
if material in self._attlen_cache:
return self._attlen_cache[material]
fname = self._MATERIAL_FILES.get(material)
if not fname:
# 'none' or unsupported mapped to empty
self._attlen_cache[material] = ([], [])
return ([], [])
energies_eV: List[float] = []
attlen_um: List[float] = []
# Load from installed package: <this_module>/filter_data/<fname>
res = resources.files(ft_pkg) / "filter_data" / fname
try:
# as_file yields a concrete filesystem path even if package is zipped
with resources.as_file(res) as p:
with p.open("r") as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 2:
continue
energies_eV.append(float(parts[0]))
attlen_um.append(float(parts[1]))
except FileNotFoundError as e:
raise FileNotFoundError(
f"Attenuation data file not found for '{material}': {fname} "
f"(looked in package filter_data)"
) from e
if not energies_eV:
raise ValueError(
f"Attenuation data for material '{material}' is empty or invalid: {fname}"
)
self._attlen_cache[material] = (energies_eV, attlen_um)
return energies_eV, attlen_um
def _attenuation_length_um(self, energy_kev: float, material: str) -> float:
"""
Linear interpolation of attenuation length λ(energy) in µm.
Input energy in keV; tables are in eV.
"""
material = material.lower()
if material == "none":
return float("inf") # No attenuation
energies_eV, attlen_um = self._load_attlen(material)
if not energies_eV:
# unsupported mapped above
raise ValueError(f"Unsupported material or missing data: '{material}'")
e_ev = energy_kev * 1000.0
# Clip to the nearest edge for practicality
if e_ev <= energies_eV[0]:
bec_logger.logger.warning(
f"[cSAXS] energy {energy_kev:.3f} keV below table range for {material}; "
f"clipping to {energies_eV[0]/1000.0:.3f} keV."
)
return attlen_um[0]
if e_ev >= energies_eV[-1]:
bec_logger.logger.warning(
f"[cSAXS] energy {energy_kev:.3f} keV above table range for {material}; "
f"clipping to {energies_eV[-1]/1000.0:.3f} keV."
)
return attlen_um[-1]
# Binary search for interval
lo, hi = 0, len(energies_eV) - 1
while hi - lo > 1:
mid = (lo + hi) // 2
if energies_eV[mid] >= e_ev:
hi = mid
else:
lo = mid
e_lo, e_hi = energies_eV[lo], energies_eV[hi]
lam_lo, lam_hi = attlen_um[lo], attlen_um[hi]
lam = (e_ev - e_lo) / (e_hi - e_lo) * (lam_hi - lam_lo) + lam_lo
return lam
def _position_transmission(
self,
pos_entry: Tuple[Tuple[str, float], Tuple[str, float], bool],
energy_kev: float,
) -> Optional[float]:
"""Transmission for a single position (possibly two layers)."""
(mat1, th1), (mat2, th2), enabled = pos_entry
if not enabled:
return None
T1 = 1.0
T2 = 1.0
if mat1 != "none" and th1 > 0.0:
lam1 = self._attenuation_length_um(energy_kev, mat1)
T1 = math.exp(-th1 / lam1)
if mat2 != "none" and th2 > 0.0:
lam2 = self._attenuation_length_um(energy_kev, mat2)
T2 = math.exp(-th2 / lam2)
return T1 * T2
def _all_combinations(self, energy_kev: float) -> List[dict]:
"""
Enumerate all enabled combinations across 4 units.
Returns a list of dicts sorted by transmission ascending:
{
'code': 'abcd' # positions 1..6 per unit
'indices': [i0, i1, i2, i3], # 0..5
'materials': [ ((m1,t1), (m2,t2)|None ), ... for 4 units ]
'transmission': float
}
"""
# Slice filters per unit
units = [
self._FILTERS[u * self._PER_UNIT : (u + 1) * self._PER_UNIT]
for u in range(self._UNITS)
]
# Precompute per-position transmissions
per_pos_T: List[List[Optional[float]]] = [
[self._position_transmission(pos_entry, energy_kev) for pos_entry in unit]
for unit in units
]
combos: List[dict] = []
for i0 in range(self._PER_UNIT):
T0 = per_pos_T[0][i0]
if T0 is None:
continue
for i1 in range(self._PER_UNIT):
T1 = per_pos_T[1][i1]
if T1 is None:
continue
for i2 in range(self._PER_UNIT):
T2 = per_pos_T[2][i2]
if T2 is None:
continue
for i3 in range(self._PER_UNIT):
T3 = per_pos_T[3][i3]
if T3 is None:
continue
T = T0 * T1 * T2 * T3
indices = [i0, i1, i2, i3]
code = "".join(str(i + 1) for i in indices)
# Collect materials/thickness for reporting
mats = []
for u, idx in enumerate(indices):
(m1, t1), (m2, t2), _ = units[u][idx]
mats.append(((m1, t1), (m2, t2) if (m2 != "none" and t2 > 0.0) else None))
combos.append(
{
"code": code,
"indices": indices,
"materials": mats,
"transmission": T,
}
)
combos.sort(key=lambda c: c["transmission"]) # ascending
return combos
def _find_best_combination(self, target_T: float, energy_kev: float) -> dict:
"""Pick combination with transmission closest to target."""
combos = self._all_combinations(energy_kev)
best = min(combos, key=lambda c: abs(c["transmission"] - target_T))
return best
def _neighbors_around_best(
self, target_T: float, energy_kev: float, span: int = 5
) -> List[dict]:
"""Return a few combinations around the best, by proximity in transmission."""
combos = self._all_combinations(energy_kev)
ranked = sorted(combos, key=lambda c: abs(c["transmission"] - target_T))
return ranked[1 : 1 + span] # exclude best itself
# -----------------------------
# Formatting & motion
# -----------------------------
def _print_combination(self, comb: dict, energy_kev: float, header: Optional[str] = None):
if header:
print(f"\n{header}:")
code = comb["code"]
T = comb["transmission"]
print(f" Filters {code} transmission at {energy_kev:.3f} keV = {T:9.3e}")
# Per-unit detail for the selected combination only
for u, matinfo in enumerate(comb["materials"], start=1):
first, second = matinfo
(m1, t1) = first
pos = int(code[u - 1])
if second is not None:
(m2, t2) = second
print(
f" unit {u}: #{pos} "
f"{t1:4.0f} mu {m1:<4} + {t2:4.0f} mu {m2:<4}"
)
else:
if m1 != "none" and t1 > 0.0:
print(f" unit {u}: #{pos} {t1:4.0f} mu {m1:<4}")
else:
print(f" unit {u}: #{pos} ----- out")
def _execute_combination(self, comb: dict, energy_kev: float):
"""
Execute motion to the indices encoded in 'comb["code"]'.
Mapping:
- code 'abcd' → per-unit index (a,b,c,d) ∈ {1..6}
- positions are looked up from _POSITIONS_USER
- axes are defined in _AXES
Motion uses: umv(dev.filter_array_?_x, <position>)
If 'umv' is not present, falls back to axis.move(<position>).
"""
indices = comb["indices"] # 0-based per unit
targets = []
for unit_idx, pos_idx in enumerate(indices):
pos_list = self._POSITIONS_USER[unit_idx]
target_pos = pos_list[pos_idx]
if target_pos is None:
raise RuntimeError(
f"Unit {unit_idx+1} position {pos_idx+1} has no defined coordinate."
)
targets.append(target_pos)
# Perform motion via 'umv(dev.axis, position)' or fallback to .move()
print("\nExecuting motion to selected combination:")
dev = getattr(self.client, "dev", None)
if dev is None:
# fallback to device_manager if dev is not available
dev = self.client.device_manager
for unit_idx, target in enumerate(targets):
axis_name = self._AXES[unit_idx]
axis_obj = getattr(dev, axis_name, None)
if axis_obj is None:
raise RuntimeError(f"Device axis '{axis_name}' not found (dev).")
print(f" {axis_name}{target:.3f}")
# Try the umv command first
if hasattr(self.client, "umv"):
self.client.umv(axis_obj, target)
else:
# Fallback to direct move
if hasattr(axis_obj, "move"):
axis_obj.move(target)
else:
raise RuntimeError(
f"Axis '{axis_name}' has no 'move' method and client.umv is unavailable."
)
# Verify final positions
print("\nVerifying final positions:")
for unit_idx in range(self._UNITS):
axis_name = self._AXES[unit_idx]
axis_obj = getattr(dev, axis_name)
if hasattr(axis_obj, "read"):
actual = float(axis_obj.read())
print(f" {axis_name} = {actual:.3f}")
else:
print(f" {axis_name}: readback unavailable")
# Optionally: achieved transmission (approx., using the selected combination value)
achieved_T = comb["transmission"]
print(f"\nAchieved transmission (approx.): {achieved_T:9.3e} at {energy_kev:.3f} keV")