initial version of fil_trans
This commit is contained in:
@@ -0,0 +1,500 @@
|
||||
|
||||
"""
|
||||
cSAXS exposure-box filter transmission utilities.
|
||||
|
||||
Implements fil_trans physics:
|
||||
- Per-material attenuation-length tables loaded from package 'filter_data/'.
|
||||
- Linear interpolation of attenuation length vs. energy (eV).
|
||||
- Transmission T = exp(-t / lambda), t in micrometers.
|
||||
- Enumeration of all enabled combinations across 4 units x 6 positions.
|
||||
- Selection of the combination with transmission closest to a target.
|
||||
|
||||
Motion is executed using provided position tables for each filter_array_*_x stage.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from importlib import resources
|
||||
# Resolve the filter_data/ folder via importlib.resources
|
||||
import csaxs_bec.bec_ipython_client.plugins.cSAXS.filter_transmission as ft_pkg
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
|
||||
class cSAXSFilterTransmission:
|
||||
"""
|
||||
Mixin providing the fil_trans command.
|
||||
Assumes self.client and self.OMNYTools exist.
|
||||
|
||||
Example:
|
||||
csaxs.fil_trans(0.10, energy_kev=6.2) # dry-run first, then asks to execute
|
||||
"""
|
||||
|
||||
# -----------------------------
|
||||
# Material naming & file mapping
|
||||
# -----------------------------
|
||||
_MATERIAL_FILES: Dict[str, Optional[str]] = {
|
||||
"none": None,
|
||||
"diam": "filter_attenuation-length_diamond.txt",
|
||||
"al": "filter_attenuation-length_al.txt",
|
||||
"si": "filter_attenuation-length_si.txt",
|
||||
"ti": "filter_attenuation-length_ti.txt",
|
||||
"cu": "filter_attenuation-length_cu.txt",
|
||||
"ge": "filter_attenuation-length_ge.txt",
|
||||
"zr": "filter_attenuation-length_zr.txt",
|
||||
# optional; provide file if you enable Fe5
|
||||
"fe": "filter_attenuation-length_fe.txt",
|
||||
}
|
||||
|
||||
# -----------------------------------------
|
||||
# Exposure-box filter configuration (4 x 6)
|
||||
#
|
||||
# Current hardware (per your message):
|
||||
# Unit 1 (filter_array_1_x): out, Si400, Ge300, Ti800, Zr20
|
||||
# Unit 2 (filter_array_2_x): out, Si200, Si3200, Ti400, Cu20
|
||||
# Unit 3 (filter_array_3_x): out, Si100, Si1600, Ti200, Ti3200, Fe5
|
||||
# Unit 4 (filter_array_4_x): out, Si50, Si800, Ti100, Ti1600, Ti20
|
||||
#
|
||||
# Positions 1..6 = [out, m1, m2, m3, m4, m5]
|
||||
# Each entry: ((mat1, th1_um), (mat2, th2_um), enabled_bool)
|
||||
# -----------------------------------------
|
||||
_FILTERS: List[Tuple[Tuple[str, float], Tuple[str, float], bool]] = [
|
||||
# Unit 1
|
||||
(("none", 0.0), ("none", 0.0), True), # out
|
||||
(("si", 400.0), ("none", 0.0), True), # Si400
|
||||
(("ge", 300.0), ("none", 0.0), True), # Ge300
|
||||
(("ti", 800.0), ("none", 0.0), True), # Ti800
|
||||
(("zr", 20.0), ("none", 0.0), True), # Zr20
|
||||
(("none", 0.0), ("none", 0.0), False), # unused
|
||||
|
||||
# Unit 2
|
||||
(("none", 0.0), ("none", 0.0), True), # out
|
||||
(("si", 200.0), ("none", 0.0), True), # Si200
|
||||
(("si", 3200.0), ("none", 0.0), True), # Si3200
|
||||
(("ti", 400.0), ("none", 0.0), True), # Ti400
|
||||
(("cu", 20.0), ("none", 0.0), True), # Cu20
|
||||
(("none", 0.0), ("none", 0.0), False), # unused
|
||||
|
||||
# Unit 3
|
||||
(("none", 0.0), ("none", 0.0), True), # out
|
||||
(("si", 100.0), ("none", 0.0), True), # Si100
|
||||
(("si", 1600.0), ("none", 0.0), True), # Si1600
|
||||
(("ti", 200.0), ("none", 0.0), True), # Ti200
|
||||
(("ti", 3200.0), ("none", 0.0), True), # Ti3200
|
||||
(("fe", 5.0), ("none", 0.0), False), # Fe5 (disabled unless data file provided)
|
||||
|
||||
# Unit 4
|
||||
(("none", 0.0), ("none", 0.0), True), # out
|
||||
(("si", 50.0), ("none", 0.0), True), # Si50
|
||||
(("si", 800.0), ("none", 0.0), True), # Si800
|
||||
(("ti", 100.0), ("none", 0.0), True), # Ti100
|
||||
(("ti", 1600.0), ("none", 0.0), True), # Ti1600
|
||||
(("ti", 20.0), ("none", 0.0), True), # Ti20
|
||||
]
|
||||
|
||||
_UNITS = 4
|
||||
_PER_UNIT = 6
|
||||
|
||||
# -----------------------------------------
|
||||
# Motion mapping: user-scale coordinates
|
||||
# [out, m1, m2, m3, m4, m5]
|
||||
# -----------------------------------------
|
||||
_POSITIONS_USER: List[List[Optional[float]]] = [
|
||||
# Unit 1 (filter_array_1_x)
|
||||
[25.0, 17.9, 7.9, -2.3, -12.1, None],
|
||||
# Unit 2 (filter_array_2_x)
|
||||
[25.5, 17.6, 7.8, -2.3, -12.3, None],
|
||||
# Unit 3 (filter_array_3_x)
|
||||
[25.8, 17.6, 7.8, -2.2, -12.3, -22.3], # Fe5 at -22.3
|
||||
# Unit 4 (filter_array_4_x)
|
||||
[25.0, 17.5, 7.5, -2.2, -12.4, -22.2],
|
||||
]
|
||||
|
||||
# Device axis names (adjust if different)
|
||||
_AXES: List[str] = [
|
||||
"filter_array_1_x",
|
||||
"filter_array_2_x",
|
||||
"filter_array_3_x",
|
||||
"filter_array_4_x",
|
||||
]
|
||||
|
||||
# -----------------------------
|
||||
# Construction / Internals
|
||||
# -----------------------------
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
# In multiple-inheritance setups our __init__ might be skipped; guard lazily too.
|
||||
self._attlen_cache: Dict[str, Tuple[List[float], List[float]]] = {}
|
||||
|
||||
def _ensure_internal_state(self):
|
||||
"""Lazy guard for robustness if __init__ wasn’t called."""
|
||||
if not hasattr(self, "_attlen_cache"):
|
||||
self._attlen_cache = {}
|
||||
|
||||
# -----------------------------
|
||||
# Public API
|
||||
# -----------------------------
|
||||
def fil_trans(
|
||||
self,
|
||||
transmission: float,
|
||||
energy_kev: Optional[float] = None,
|
||||
print_only: bool = True,
|
||||
) -> Optional[None]:
|
||||
"""
|
||||
Set exposure-box filters to achieve a target transmission.
|
||||
|
||||
Physics:
|
||||
- Load per-material attenuation-length tables (energy in eV)
|
||||
- Linear interpolation to get λ(energy) in µm
|
||||
- Per-position transmission: product of layer transmissions
|
||||
- Combination transmission: product across 4 units
|
||||
- Choose combination whose transmission is closest to target
|
||||
"""
|
||||
self._ensure_internal_state()
|
||||
|
||||
# --- Validation ---
|
||||
try:
|
||||
transmission = float(transmission)
|
||||
except Exception:
|
||||
raise ValueError("Transmission must be numeric.")
|
||||
|
||||
if not (0.0 < transmission <= 1.0):
|
||||
raise ValueError("Transmission must be between 0 and 1.")
|
||||
|
||||
# --- Energy handling ---
|
||||
if energy_kev is None:
|
||||
try:
|
||||
energy_kev = float(self.client.device_manager.mokev.read())
|
||||
except Exception as exc:
|
||||
raise RuntimeError(
|
||||
"Energy not specified and could not read current beam energy."
|
||||
) from exc
|
||||
else:
|
||||
energy_kev = float(energy_kev)
|
||||
|
||||
# --- Summary header ---
|
||||
print("\nExposure-box filter transmission request")
|
||||
print("-" * 60)
|
||||
print(f"Target transmission : {transmission:.6e}")
|
||||
print(f"Photon energy : {energy_kev:.3f} keV")
|
||||
print(f"Mode : {'PRINT ONLY' if print_only else 'EXECUTE'}")
|
||||
print("-" * 60)
|
||||
|
||||
# --- Compute best combination ---
|
||||
best = self._find_best_combination(transmission, energy_kev)
|
||||
|
||||
# --- Report (selected combination with per-unit detail) ---
|
||||
self._print_combination(best, energy_kev, header="Selected combination")
|
||||
|
||||
# Nearby: print only code + transmission (no per-unit details)
|
||||
neighbors = self._neighbors_around_best(transmission, energy_kev, span=5)
|
||||
if neighbors:
|
||||
print("\nNearby combinations (by transmission proximity):")
|
||||
for row in neighbors:
|
||||
print(
|
||||
f"{row['transmission']:9.3e}"
|
||||
)
|
||||
|
||||
# --- Dry run prompt: ask to execute now (default 'y') ---
|
||||
if print_only:
|
||||
print("\n[DRY RUN] No motion executed yet.")
|
||||
if self.OMNYTools.yesno(
|
||||
"Execute motion to the selected filter combination now?",
|
||||
"y", # default YES
|
||||
):
|
||||
self._execute_combination(best, energy_kev)
|
||||
else:
|
||||
print("Execution skipped.")
|
||||
return None
|
||||
|
||||
|
||||
# --- Execute motion directly (if print_only=False) ---
|
||||
self._execute_combination(best, energy_kev)
|
||||
return None
|
||||
|
||||
# -----------------------------
|
||||
# Physics helpers
|
||||
# -----------------------------
|
||||
def _load_attlen(self, material: str) -> Tuple[List[float], List[float]]:
|
||||
"""
|
||||
Load and cache attenuation-length table for a material from package resources.
|
||||
|
||||
Returns:
|
||||
energies_eV (list), attlen_um (list)
|
||||
"""
|
||||
self._ensure_internal_state()
|
||||
|
||||
material = material.lower()
|
||||
if material in self._attlen_cache:
|
||||
return self._attlen_cache[material]
|
||||
|
||||
fname = self._MATERIAL_FILES.get(material)
|
||||
if not fname:
|
||||
# 'none' or unsupported mapped to empty
|
||||
self._attlen_cache[material] = ([], [])
|
||||
return ([], [])
|
||||
|
||||
energies_eV: List[float] = []
|
||||
attlen_um: List[float] = []
|
||||
|
||||
# Load from installed package: <this_module>/filter_data/<fname>
|
||||
res = resources.files(ft_pkg) / "filter_data" / fname
|
||||
try:
|
||||
# as_file yields a concrete filesystem path even if package is zipped
|
||||
with resources.as_file(res) as p:
|
||||
with p.open("r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
parts = line.split()
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
energies_eV.append(float(parts[0]))
|
||||
attlen_um.append(float(parts[1]))
|
||||
except FileNotFoundError as e:
|
||||
raise FileNotFoundError(
|
||||
f"Attenuation data file not found for '{material}': {fname} "
|
||||
f"(looked in package filter_data)"
|
||||
) from e
|
||||
|
||||
if not energies_eV:
|
||||
raise ValueError(
|
||||
f"Attenuation data for material '{material}' is empty or invalid: {fname}"
|
||||
)
|
||||
|
||||
self._attlen_cache[material] = (energies_eV, attlen_um)
|
||||
return energies_eV, attlen_um
|
||||
|
||||
def _attenuation_length_um(self, energy_kev: float, material: str) -> float:
|
||||
"""
|
||||
Linear interpolation of attenuation length λ(energy) in µm.
|
||||
Input energy in keV; tables are in eV.
|
||||
"""
|
||||
material = material.lower()
|
||||
if material == "none":
|
||||
return float("inf") # No attenuation
|
||||
|
||||
energies_eV, attlen_um = self._load_attlen(material)
|
||||
if not energies_eV:
|
||||
# unsupported mapped above
|
||||
raise ValueError(f"Unsupported material or missing data: '{material}'")
|
||||
|
||||
e_ev = energy_kev * 1000.0
|
||||
|
||||
# Clip to the nearest edge for practicality
|
||||
if e_ev <= energies_eV[0]:
|
||||
bec_logger.logger.warning(
|
||||
f"[cSAXS] energy {energy_kev:.3f} keV below table range for {material}; "
|
||||
f"clipping to {energies_eV[0]/1000.0:.3f} keV."
|
||||
)
|
||||
return attlen_um[0]
|
||||
if e_ev >= energies_eV[-1]:
|
||||
bec_logger.logger.warning(
|
||||
f"[cSAXS] energy {energy_kev:.3f} keV above table range for {material}; "
|
||||
f"clipping to {energies_eV[-1]/1000.0:.3f} keV."
|
||||
)
|
||||
return attlen_um[-1]
|
||||
|
||||
# Binary search for interval
|
||||
lo, hi = 0, len(energies_eV) - 1
|
||||
while hi - lo > 1:
|
||||
mid = (lo + hi) // 2
|
||||
if energies_eV[mid] >= e_ev:
|
||||
hi = mid
|
||||
else:
|
||||
lo = mid
|
||||
|
||||
e_lo, e_hi = energies_eV[lo], energies_eV[hi]
|
||||
lam_lo, lam_hi = attlen_um[lo], attlen_um[hi]
|
||||
lam = (e_ev - e_lo) / (e_hi - e_lo) * (lam_hi - lam_lo) + lam_lo
|
||||
return lam
|
||||
|
||||
def _position_transmission(
|
||||
self,
|
||||
pos_entry: Tuple[Tuple[str, float], Tuple[str, float], bool],
|
||||
energy_kev: float,
|
||||
) -> Optional[float]:
|
||||
"""Transmission for a single position (possibly two layers)."""
|
||||
(mat1, th1), (mat2, th2), enabled = pos_entry
|
||||
if not enabled:
|
||||
return None
|
||||
T1 = 1.0
|
||||
T2 = 1.0
|
||||
if mat1 != "none" and th1 > 0.0:
|
||||
lam1 = self._attenuation_length_um(energy_kev, mat1)
|
||||
T1 = math.exp(-th1 / lam1)
|
||||
if mat2 != "none" and th2 > 0.0:
|
||||
lam2 = self._attenuation_length_um(energy_kev, mat2)
|
||||
T2 = math.exp(-th2 / lam2)
|
||||
return T1 * T2
|
||||
|
||||
def _all_combinations(self, energy_kev: float) -> List[dict]:
|
||||
"""
|
||||
Enumerate all enabled combinations across 4 units.
|
||||
Returns a list of dicts sorted by transmission ascending:
|
||||
{
|
||||
'code': 'abcd' # positions 1..6 per unit
|
||||
'indices': [i0, i1, i2, i3], # 0..5
|
||||
'materials': [ ((m1,t1), (m2,t2)|None ), ... for 4 units ]
|
||||
'transmission': float
|
||||
}
|
||||
"""
|
||||
# Slice filters per unit
|
||||
units = [
|
||||
self._FILTERS[u * self._PER_UNIT : (u + 1) * self._PER_UNIT]
|
||||
for u in range(self._UNITS)
|
||||
]
|
||||
|
||||
# Precompute per-position transmissions
|
||||
per_pos_T: List[List[Optional[float]]] = [
|
||||
[self._position_transmission(pos_entry, energy_kev) for pos_entry in unit]
|
||||
for unit in units
|
||||
]
|
||||
|
||||
combos: List[dict] = []
|
||||
for i0 in range(self._PER_UNIT):
|
||||
T0 = per_pos_T[0][i0]
|
||||
if T0 is None:
|
||||
continue
|
||||
for i1 in range(self._PER_UNIT):
|
||||
T1 = per_pos_T[1][i1]
|
||||
if T1 is None:
|
||||
continue
|
||||
for i2 in range(self._PER_UNIT):
|
||||
T2 = per_pos_T[2][i2]
|
||||
if T2 is None:
|
||||
continue
|
||||
for i3 in range(self._PER_UNIT):
|
||||
T3 = per_pos_T[3][i3]
|
||||
if T3 is None:
|
||||
continue
|
||||
|
||||
T = T0 * T1 * T2 * T3
|
||||
indices = [i0, i1, i2, i3]
|
||||
code = "".join(str(i + 1) for i in indices)
|
||||
|
||||
# Collect materials/thickness for reporting
|
||||
mats = []
|
||||
for u, idx in enumerate(indices):
|
||||
(m1, t1), (m2, t2), _ = units[u][idx]
|
||||
mats.append(((m1, t1), (m2, t2) if (m2 != "none" and t2 > 0.0) else None))
|
||||
|
||||
combos.append(
|
||||
{
|
||||
"code": code,
|
||||
"indices": indices,
|
||||
"materials": mats,
|
||||
"transmission": T,
|
||||
}
|
||||
)
|
||||
|
||||
combos.sort(key=lambda c: c["transmission"]) # ascending
|
||||
return combos
|
||||
|
||||
def _find_best_combination(self, target_T: float, energy_kev: float) -> dict:
|
||||
"""Pick combination with transmission closest to target."""
|
||||
combos = self._all_combinations(energy_kev)
|
||||
best = min(combos, key=lambda c: abs(c["transmission"] - target_T))
|
||||
return best
|
||||
|
||||
def _neighbors_around_best(
|
||||
self, target_T: float, energy_kev: float, span: int = 5
|
||||
) -> List[dict]:
|
||||
"""Return a few combinations around the best, by proximity in transmission."""
|
||||
combos = self._all_combinations(energy_kev)
|
||||
ranked = sorted(combos, key=lambda c: abs(c["transmission"] - target_T))
|
||||
return ranked[1 : 1 + span] # exclude best itself
|
||||
|
||||
# -----------------------------
|
||||
# Formatting & motion
|
||||
# -----------------------------
|
||||
def _print_combination(self, comb: dict, energy_kev: float, header: Optional[str] = None):
|
||||
if header:
|
||||
print(f"\n{header}:")
|
||||
code = comb["code"]
|
||||
T = comb["transmission"]
|
||||
print(f" Filters {code} transmission at {energy_kev:.3f} keV = {T:9.3e}")
|
||||
# Per-unit detail for the selected combination only
|
||||
for u, matinfo in enumerate(comb["materials"], start=1):
|
||||
first, second = matinfo
|
||||
(m1, t1) = first
|
||||
pos = int(code[u - 1])
|
||||
if second is not None:
|
||||
(m2, t2) = second
|
||||
print(
|
||||
f" unit {u}: #{pos} "
|
||||
f"{t1:4.0f} mu {m1:<4} + {t2:4.0f} mu {m2:<4}"
|
||||
)
|
||||
else:
|
||||
if m1 != "none" and t1 > 0.0:
|
||||
print(f" unit {u}: #{pos} {t1:4.0f} mu {m1:<4}")
|
||||
else:
|
||||
print(f" unit {u}: #{pos} ----- out")
|
||||
|
||||
def _execute_combination(self, comb: dict, energy_kev: float):
|
||||
"""
|
||||
Execute motion to the indices encoded in 'comb["code"]'.
|
||||
|
||||
Mapping:
|
||||
- code 'abcd' → per-unit index (a,b,c,d) ∈ {1..6}
|
||||
- positions are looked up from _POSITIONS_USER
|
||||
- axes are defined in _AXES
|
||||
|
||||
Motion uses: umv(dev.filter_array_?_x, <position>)
|
||||
If 'umv' is not present, falls back to axis.move(<position>).
|
||||
"""
|
||||
indices = comb["indices"] # 0-based per unit
|
||||
targets = []
|
||||
for unit_idx, pos_idx in enumerate(indices):
|
||||
pos_list = self._POSITIONS_USER[unit_idx]
|
||||
target_pos = pos_list[pos_idx]
|
||||
if target_pos is None:
|
||||
raise RuntimeError(
|
||||
f"Unit {unit_idx+1} position {pos_idx+1} has no defined coordinate."
|
||||
)
|
||||
targets.append(target_pos)
|
||||
|
||||
# Perform motion via 'umv(dev.axis, position)' or fallback to .move()
|
||||
print("\nExecuting motion to selected combination:")
|
||||
dev = getattr(self.client, "dev", None)
|
||||
if dev is None:
|
||||
# fallback to device_manager if dev is not available
|
||||
dev = self.client.device_manager
|
||||
|
||||
for unit_idx, target in enumerate(targets):
|
||||
axis_name = self._AXES[unit_idx]
|
||||
axis_obj = getattr(dev, axis_name, None)
|
||||
if axis_obj is None:
|
||||
raise RuntimeError(f"Device axis '{axis_name}' not found (dev).")
|
||||
print(f" {axis_name} → {target:.3f}")
|
||||
|
||||
# Try the umv command first
|
||||
if hasattr(self.client, "umv"):
|
||||
self.client.umv(axis_obj, target)
|
||||
else:
|
||||
# Fallback to direct move
|
||||
if hasattr(axis_obj, "move"):
|
||||
axis_obj.move(target)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"Axis '{axis_name}' has no 'move' method and client.umv is unavailable."
|
||||
)
|
||||
|
||||
# Verify final positions
|
||||
print("\nVerifying final positions:")
|
||||
for unit_idx in range(self._UNITS):
|
||||
axis_name = self._AXES[unit_idx]
|
||||
axis_obj = getattr(dev, axis_name)
|
||||
if hasattr(axis_obj, "read"):
|
||||
actual = float(axis_obj.read())
|
||||
print(f" {axis_name} = {actual:.3f}")
|
||||
else:
|
||||
print(f" {axis_name}: readback unavailable")
|
||||
|
||||
# Optionally: achieved transmission (approx., using the selected combination value)
|
||||
achieved_T = comb["transmission"]
|
||||
print(f"\nAchieved transmission (approx.): {achieved_T:9.3e} at {energy_kev:.3f} keV")
|
||||
Reference in New Issue
Block a user