first fixes commissioning with hardware
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m16s

This commit is contained in:
x12sa
2026-01-15 14:31:43 +01:00
parent 4752011a1f
commit d65d6b5955
3 changed files with 70 additions and 72 deletions

View File

@@ -33,7 +33,7 @@ class cSAXS(
# this is the csaxs master file that imports all routines from csaxs
# can be imported in the bec client by
#
# run in bec from folder /sls/x12sa/config/bec/production/csaxs_bec
# from csaxs_bec.bec_ipython_client.plugins.cSAXS.cSAXS import cSAXS
# csaxs = cSAXS(bec)
#

View File

@@ -26,6 +26,14 @@ import csaxs_bec.bec_ipython_client.plugins.cSAXS.filter_transmission as ft_pkg
from bec_lib import bec_logger
import builtins
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class cSAXSFilterTransmission:
"""
@@ -140,23 +148,28 @@ class cSAXSFilterTransmission:
# -----------------------------
# Public API
# -----------------------------
def fil_trans_select(
self,
transmission: float,
transmission: Optional[float] = None,
energy_kev: Optional[float] = None,
print_only: bool = True,
) -> Optional[None]:
"""
Set exposure-box filters to achieve a target transmission.
Physics:
- Load per-material attenuation-length tables (energy in eV)
- Linear interpolation to get λ(energy) in µm
- Per-position transmission: product of layer transmissions
- Combination transmission: product across 4 units
- Choose combination whose transmission is closest to target
If called without 'transmission', prints usage and current status.
"""
self._ensure_internal_state()
if transmission is None:
print("\nUsage:")
print(" csaxs.fil_trans_select(<transmission>, energy_kev=<energy>, print_only=True)")
print("Example:")
print(" csaxs.fil_trans_select(0.10, energy_kev=6.2)")
print("\nCurrent filter transmission:")
self._fil_trans_report(energy_kev=energy_kev)
return None
# --- Validation ---
try:
@@ -170,7 +183,7 @@ class cSAXSFilterTransmission:
# --- Energy handling ---
if energy_kev is None:
try:
energy_kev = float(self.client.device_manager.mokev.read())
energy_kev = float(dev.mokev.read()) # using global dev
except Exception as exc:
raise RuntimeError(
"Energy not specified and could not read current beam energy."
@@ -189,19 +202,17 @@ class cSAXSFilterTransmission:
# --- Compute best combination ---
best = self._find_best_combination(transmission, energy_kev)
# --- Report (selected combination with per-unit detail) ---
# --- Report selected combination ---
self._print_combination(best, energy_kev, header="Selected combination")
# Nearby: print only code + transmission (no per-unit details)
# Nearby: print only code + transmission
neighbors = self._neighbors_around_best(transmission, energy_kev, span=5)
if neighbors:
print("\nNearby combinations (by transmission proximity):")
for row in neighbors:
print(
f"{row['transmission']:9.3e}"
)
print(f"{row['transmission']:9.3e}")
# --- Dry run prompt: ask to execute now (default 'y') ---
# --- Dry run prompt ---
if print_only:
print("\n[DRY RUN] No motion executed yet.")
if self.OMNYTools.yesno(
@@ -213,11 +224,11 @@ class cSAXSFilterTransmission:
print("Execution skipped.")
return None
# --- Execute motion directly (if print_only=False) ---
# --- Execute motion directly ---
self._execute_combination(best, energy_kev)
return None
# -----------------------------
# Physics helpers
# -----------------------------
@@ -438,20 +449,24 @@ class cSAXSFilterTransmission:
else:
print(f" unit {u}: #{pos} ----- out")
def _execute_combination(self, comb: dict, energy_kev: float):
"""
Execute motion to the indices encoded in 'comb["code"]'.
Mapping:
- code 'abcd' → per-unit index (a,b,c,d) ∈ {1..6}
- positions are looked up from _POSITIONS_USER
- axes are defined in _AXES
- code 'abcd' → per-unit index (a,b,c,d) ∈ {1..6}
- positions are looked up from _POSITIONS_USER
- axes are defined in _AXES
Motion uses: umv(dev.filter_array_?_x, <position>)
If 'umv' is not present, falls back to axis.move(<position>).
Motion uses: umv(dev.axis, position, dev.axis2, position2, ...)
"""
indices = comb["indices"] # 0-based per unit
targets = []
move_args = [] # Collect (device, position) pairs
print("\nExecuting combined motion:")
for unit_idx, pos_idx in enumerate(indices):
pos_list = self._POSITIONS_USER[unit_idx]
target_pos = pos_list[pos_idx]
@@ -459,76 +474,59 @@ class cSAXSFilterTransmission:
raise RuntimeError(
f"Unit {unit_idx+1} position {pos_idx+1} has no defined coordinate."
)
targets.append(target_pos)
# Perform motion via 'umv(dev.axis, position)' or fallback to .move()
print("\nExecuting motion to selected combination:")
dev = getattr(self.client, "dev", None)
if dev is None:
# fallback to device_manager if dev is not available
dev = self.client.device_manager
for unit_idx, target in enumerate(targets):
axis_name = self._AXES[unit_idx]
axis_obj = getattr(dev, axis_name, None)
if axis_obj is None:
raise RuntimeError(f"Device axis '{axis_name}' not found (dev).")
print(f" {axis_name}{target:.3f}")
axis_obj = getattr(dev, axis_name)
print(f" {axis_name}{target_pos:.3f}")
move_args.extend([axis_obj, target_pos])
# Try the umv command first
if hasattr(self.client, "umv"):
self.client.umv(axis_obj, target)
else:
# Fallback to direct move
if hasattr(axis_obj, "move"):
axis_obj.move(target)
else:
raise RuntimeError(
f"Axis '{axis_name}' has no 'move' method and client.umv is unavailable."
)
umv(*move_args)
# Verify final positions
print("\nVerifying final positions:")
for unit_idx in range(self._UNITS):
axis_name = self._AXES[unit_idx]
axis_obj = getattr(dev, axis_name)
if hasattr(axis_obj, "read"):
actual = float(axis_obj.read())
try:
actual = float(axis_obj.readback.get())
print(f" {axis_name} = {actual:.3f}")
else:
except Exception:
print(f" {axis_name}: readback unavailable")
# Optionally: achieved transmission (approx., using the selected combination value)
achieved_T = comb["transmission"]
print(f"\nAchieved transmission (approx.): {achieved_T:9.3e} at {energy_kev:.3f} keV")
def fil_trans_report(self, tol: float = 0.1) -> None:
def _fil_trans_report(self, tol: float = 0.1, energy_kev: Optional[float] = None) -> None:
"""
Report the currently active exposurebox filter combination.
Determines stage positions via dev.<axis>.readback.get()
with a tolerance window of ±tol relative to nominal positions.
Parameters
----------
tol : float
Tolerance for matching positions.
energy_kev : float, optional
Photon energy in keV. If None, tries to read from dev.mokev or defaults to 6.2 keV.
"""
self._ensure_internal_state()
# Resolve device object
dev = getattr(self.client, "dev", None)
# Ensure global dev is available
if dev is None:
dev = self.client.device_manager
print("ERROR: Global 'dev' object not found.")
return
# Try getting current energy
try:
energy_kev = float(self.client.device_manager.mokev.read())
except Exception:
print("WARNING: Could not read current beam energy. Assuming 6.2 keV.")
energy_kev = 6.2
# Determine energy
if energy_kev is None:
try:
energy_kev = float(dev.mokev.read())
except Exception:
print("WARNING: Could not read current beam energy. Assuming 6.2 keV.")
energy_kev = 6.2
print("\nCurrent filter transmission report")
print("-" * 60)
print(f"Photon energy : {energy_kev:.3f} keV")
print(f"Tolerance : ±{tol:.3f}")
print("-" * 60)
indices = [] # 05 per unit
@@ -538,7 +536,6 @@ class cSAXSFilterTransmission:
print(f"ERROR: Device axis '{axis_name}' not found.")
return
# Read readback
try:
rb = float(axis_obj.readback.get())
except Exception:
@@ -561,11 +558,11 @@ class cSAXSFilterTransmission:
indices.append(best_idx)
# Build combination dictionary like internally used ones
# Build combination code
code = "".join(str(i + 1) for i in indices)
print(f"\nMatched filter code: {code}")
print(f"Matched filter code: {code}")
# Build full combination record for transmission calculation
# Compute transmission and materials
units = [
self._FILTERS[u * self._PER_UNIT : (u + 1) * self._PER_UNIT]
for u in range(self._UNITS)
@@ -588,7 +585,8 @@ class cSAXSFilterTransmission:
materials.append(((m1, t1), None))
# Print detailed report
print(f"Total transmission: {total_T:.6e}\n")
self.OMNYTools.printgreenbold(f"Total transmission: {total_T:.6e}")
print("-" * 60)
for u, matinfo in enumerate(materials, start=1):
(m1, t1), second = matinfo

View File

@@ -173,7 +173,7 @@ class cSAXSInitSmaractStages:
+ ", ".join(not_referenced)
)
bec_logger.logger.error(
"[cSAXS] Aborting motion. Please reference axes first."
"[cSAXS] Aborting motion. Please reference axes first. \nOr skip the axes by e.g. \nsmaract_all_components_to_initial_position(skip_devices=[\"fast_shutter_n1_x\",\"fast_shutter_o1_x\"]"
)
return