mods after discussing at beamline

This commit is contained in:
x12sa
2026-01-16 13:15:24 +01:00
committed by appel_c
parent cba891e5f1
commit de2f58f170
@@ -51,96 +51,240 @@ class cSAXSInitSmaractStages:
def __init__(self, client) -> None:
self.client = client
def smaract_reference_stages(self):
#Todo make possible to ref all stages or just those not referenced yet. also init motion only of those newly referenced
def smaract_reference_stages(self, force=False, devices_to_reference=None):
"""
Initialize all Smaract stages by setting speed, finding the reference mark, and sleeping.
"""
for dev_name, axis_letter in self.devices.items():
ch = self.AXIS_MAP[axis_letter] # Get the channel number for the axis
d = getattr(dev, dev_name) # Access the device instance using `getattr`
try:
# Set the speed (this is an example, adjust as needed)
print(f"Setting speed for {dev_name} (axis {ch})...")
d.controller.set_closed_loop_move_speed(ch, 1) # Example speed value (1)
# Find the reference mark for the axis
print(f"Finding reference mark for {dev_name} (axis {ch})...")
d.controller.find_reference_mark(ch, 0, 1000, 1) # Example reference find parameters
# Sleep after the operation
print(f"Sleeping for 0.1s after initializing {dev_name} (axis {ch})...")
time.sleep(0.1)
except AttributeError:
print(f"Device {dev_name} does not have a controller or method to initialize.")
except Exception as e:
print(f"Error initializing device {dev_name} (axis {ch}): {e}")
self.smaract_all_components_to_initial_position()
def smaract_check_all_referenced(self):
"""
Check if all axes of the devices are referenced.
"""
for dev_name, axis_letter in self.devices.items():
ch = self.AXIS_MAP[axis_letter] # Get the channel number for the axis
d = getattr(dev, dev_name) # Access the device instance using `getattr`
try:
# Check if the axis is referenced on the device controller
if not d.controller.axis_is_referenced(ch):
print(f"Device {dev_name} (axis {ch}) is NOT referenced.")
else:
print(f"Device {dev_name} (axis {ch}) is referenced.")
except AttributeError:
print(f"Device {dev_name} does not have a controller or axis_is_referenced method.")
except Exception as e:
print(f"Error checking device {dev_name} (axis {ch}): {e}")
def smaract_all_components_to_initial_position(
self,
skip_devices=None,
):
#todo just move those that are referenced, with option to select individual ones
"""
Move all SmarAct-based components to their configured init_position.
Reference SmarAct stages.
Parameters
----------
skip_devices : iterable of str, optional
Device names to skip (e.g. ["fast_shutter_n1_x"]).
force : bool, optional
If True, re-reference ALL selected stages.
If False (default), only reference stages that are currently NOT referenced.
devices_to_reference : iterable of str, optional
If provided, only these devices will be considered for referencing.
If None (default), all stages in `self.devices` are candidates.
Behavior
--------
- If devices_to_reference is given → restrict referencing to those.
- If force=False → skip devices already referenced.
- If force=True → re-reference selected devices always.
- Only newly referenced devices are passed to
smaract_components_to_initial_position() afterwards.
Examples
--------
Reference only stages that are NOT referenced yet (default)
csaxs.smaract_reference_stages()
Force re-reference of all stages
csaxs.smaract_reference_stages(force=True)
Reference only specific stages
csaxs.smaract_reference_stages(
devices_to_reference=["sl3trxi", "sl3trxo", "xbpm3x"]
)
Reference selected stages and force re-referencing
csaxs.smaract_reference_stages(
devices_to_reference=["sl4trxi", "sl4trxo"],
force=True
)
Reference a single device
csaxs.smaract_reference_stages(
devices_to_reference="xbimtrx"
)
Reference only the selected devices (skip already-referenced ones)
csaxs.smaract_reference_stages(
devices_to_reference=["sl3trxi", "sl4trxo", "sl5trxt"]
)
Check referencing status of all stages
csaxs.smaract_check_all_referenced()
"""
skip_devices = set(skip_devices or [])
# Normalize selection
if isinstance(devices_to_reference, str):
devices_to_reference = [devices_to_reference]
# First confirmation: intent
if not self.OMNYTools.yesno(
"Do you want to move all SmarAct-based components to their "
"configured initial position?",
"y",
):
return
selection = (
set(devices_to_reference)
if devices_to_reference is not None
else set(self.devices.keys())
)
unknown = selection - set(self.devices.keys())
if unknown:
print(f"Unknown devices requested and ignored: {sorted(unknown)}")
selection = selection.intersection(self.devices.keys())
newly_referenced = []
already_referenced = []
failed = []
print("\nStarting SmarAct referencing...\n")
for dev_name in sorted(selection):
axis_letter = self.devices[dev_name]
ch = self.AXIS_MAP[axis_letter]
try:
d = getattr(dev, dev_name)
except AttributeError:
print(f"{dev_name}: device not accessible, skipping.")
failed.append(dev_name)
continue
try:
is_ref = d.controller.axis_is_referenced(ch)
# Skip if already referenced and not forcing
if is_ref and not force:
print(f"{dev_name}: already referenced, skipping.")
already_referenced.append(dev_name)
continue
# Perform referencing
print(f"{dev_name}: referencing axis {ch}...")
d.controller.set_closed_loop_move_speed(ch, 1)
d.controller.find_reference_mark(ch, 0, 1000, 1)
time.sleep(0.1)
# Verify
if d.controller.axis_is_referenced(ch):
print(f"{dev_name}: successfully referenced.")
newly_referenced.append(dev_name)
else:
print(f"{dev_name}: referencing FAILED.")
failed.append(dev_name)
except Exception as e:
print(f"Error referencing {dev_name} (axis {ch}): {e}")
failed.append(dev_name)
# --- Summary ---
print("\n--- Referencing summary ---")
print(f"Newly referenced: {newly_referenced}")
print(f"Already referenced (kept): {already_referenced}")
print(f"Failed: {failed}")
print("-----------------------------------------\n")
# --- Move newly referenced only ---
if newly_referenced:
print("Moving newly referenced stages to initial positions...")
self.smaract_components_to_initial_position(
devices_to_move=newly_referenced
)
else:
print("No newly referenced stages.")
def smaract_check_all_referenced(self):
"""
Check if all SmarAct axes are referenced.
"""
for dev_name, axis_letter in self.devices.items():
ch = self.AXIS_MAP[axis_letter]
try:
d = getattr(dev, dev_name)
if d.controller.axis_is_referenced(ch):
print(f"{dev_name} (axis {ch}) is referenced.")
else:
print(f"{dev_name} (axis {ch}) is NOT referenced.")
except AttributeError:
print(f"{dev_name}: device not accessible or unsupported.")
except Exception as e:
print(f"Error checking {dev_name} (axis {ch}): {e}")
def smaract_components_to_initial_position(
self,
devices_to_move=None,
):
"""
Move selected (or all) SmarAct-based components to their configured init_position.
Parameters
----------
devices_to_move : iterable of str, optional
Specific device names to move (e.g. ["xbpm3x", "sl3trxi"]).
If None, all known SmarAct devices defined in `self.devices` are considered.
Behavior
--------
- Only axes that are referenced will be moved.
- Unreferenced axes are skipped with a WARNING; the operation continues.
- Devices missing `init_position` are skipped and listed.
- At the end, a summary warns if some stages could not be moved because they were not referenced.
"""
# Normalize selection
if isinstance(devices_to_move, str):
devices_to_move = [devices_to_move]
selection = set(devices_to_move) if devices_to_move else None
planned_moves = []
not_referenced = []
missing_params = []
inaccessible_devices = []
unknown_requested = []
# If a selection is provided, pre-check for unknown device names
if selection is not None:
known = set(self.devices.keys())
unknown_requested = sorted(list(selection - known))
if unknown_requested:
bec_logger.logger.warning(
"[cSAXS] The following requested devices are unknown and will be ignored: "
+ ", ".join(unknown_requested)
)
# First confirmation: intent
scope_desc = (
"all SmarAct-based components"
if selection is None
else "the selected SmarAct-based components"
)
if not self.OMNYTools.yesno(
f"Do you want to move {scope_desc} to the init position as defined in the config file?",
"y",
):
return
# --- Pre-check phase ---
for dev_name, axis_letter in self.devices.items():
if dev_name in skip_devices:
bec_logger.logger.info(
f"[cSAXS] Skipping device {dev_name} (user request)."
)
# If a selection is provided, only consider those in the selection
if selection is not None and dev_name not in selection:
continue
try:
d = getattr(dev, dev_name)
ch = self.AXIS_MAP[axis_letter]
except AttributeError:
bec_logger.logger.warning(
f"[cSAXS] Device {dev_name} not accessible, skipping."
)
inaccessible_devices.append(dev_name)
continue
# Resolve channel
ch = self.AXIS_MAP.get(axis_letter, None)
if ch is None:
bec_logger.logger.warning(
f"[cSAXS] Axis map has no entry for letter '{axis_letter}' "
f"(device {dev_name}); skipping."
)
continue
try:
# Reference check
if not d.controller.axis_is_referenced(ch):
not_referenced.append(dev_name)
@@ -158,31 +302,38 @@ class cSAXSInitSmaractStages:
planned_moves.append((dev_name, init_pos))
except AttributeError:
bec_logger.logger.warning(
f"[cSAXS] Device {dev_name} not accessible, skipping."
)
except Exception as exc:
bec_logger.logger.error(
f"[cSAXS] Error during pre-check for {dev_name}: {exc}"
)
# --- Hard stop conditions ---
if not_referenced:
bec_logger.logger.error(
"[cSAXS] The following devices are NOT referenced:\n"
+ ", ".join(not_referenced)
)
bec_logger.logger.error(
"[cSAXS] Aborting motion. Please reference axes first. \nOr skip the axes by e.g. \nsmaract_all_components_to_initial_position(skip_devices=[\"fast_shutter_n1_x\",\"fast_shutter_o1_x\"])"
)
return
if not planned_moves:
bec_logger.logger.warning(
"[cSAXS] No valid initial positions found. Nothing to do."
)
# Nothing to move—still summarize why.
header = "\nNo motions planned. Summary of issues:"
lines = []
if not_referenced:
lines.append(
" - Not referenced: " + ", ".join(sorted(not_referenced))
)
if missing_params:
lines.append(
" - Missing init_position: " + ", ".join(sorted(missing_params))
)
if inaccessible_devices:
lines.append(
" - Not accessible: " + ", ".join(sorted(inaccessible_devices))
)
if unknown_requested:
lines.append(
" - Unknown requested: " + ", ".join(sorted(unknown_requested))
)
if not lines:
lines.append(" - (No eligible devices or nothing to do.)")
print(header)
for line in lines:
print(line)
bec_logger.logger.warning("[cSAXS] Nothing to do.")
return
# --- Summary table ---
@@ -194,18 +345,28 @@ class cSAXSInitSmaractStages:
print(f"{dev_name:<35} {init_pos:>20}")
print("-" * 60)
# Notes / diagnostics
if selection is not None:
print("\nNote: Only the following devices were requested to move:")
print(", ".join(sorted(selection)))
if unknown_requested:
print("\nNote: The following requested devices are unknown and were ignored:")
print(", ".join(unknown_requested))
if not_referenced:
print(
"\nNote: The following devices are NOT referenced and will be skipped:"
)
print(", ".join(sorted(not_referenced)))
if missing_params:
print(
"\nNote: The following devices have no init_position defined "
"and will be skipped:"
"\nNote: The following devices have no init_position defined and will be skipped:"
)
print(", ".join(missing_params))
if skip_devices:
print(", ".join(sorted(missing_params)))
if inaccessible_devices:
print(
"\nNote: The following devices were explicitly skipped:"
"\nNote: The following devices were not accessible and will be skipped:"
)
print(", ".join(sorted(skip_devices)))
print(", ".join(sorted(inaccessible_devices)))
# Second confirmation: execution
if not self.OMNYTools.yesno(
@@ -217,31 +378,46 @@ class cSAXSInitSmaractStages:
)
return
# --- Execution phase ---
# --- Execution phase (SIMULTANEOUS MOTION) ---
# Build a flat argument list: [dev1, pos1, dev2, pos2, ...]
move_args = []
for dev_name, init_pos in planned_moves:
try:
d = getattr(dev, dev_name)
move_args.append(d)
move_args.append(init_pos)
bec_logger.logger.info(
f"[cSAXS] Moving {dev_name} to init_position = {init_pos}"
f"[cSAXS] Preparing move: {dev_name} -> {init_pos}"
)
umv(d, init_pos)
except Exception as exc:
bec_logger.logger.error(
f"[cSAXS] Failed to move {dev_name}: {exc}"
f"[cSAXS] Could not access {dev_name}: {exc}"
)
if not move_args:
bec_logger.logger.warning(
"[cSAXS] No valid devices left for simultaneous motion."
)
return
# Now trigger simultaneous move
try:
bec_logger.logger.info(
f"[cSAXS] Starting simultaneous motion of {len(planned_moves)} devices."
)
umv(*move_args) # <-- simultaneous move
except Exception as exc:
bec_logger.logger.error(
f"[cSAXS] Simultaneous motion failed: {exc}"
)
return
bec_logger.logger.info(
"[cSAXS] SmarAct motion to initial positions completed."
"[cSAXS] Simultaneous SmarAct motion to initial positions completed."
)
# example usage
# csaxs.smaract_all_components_to_initial_position(
# skip_devices=[
# "fast_shutter_n1_x",
# "fast_shutter_o1_x",
# ]
# )
class cSAXSSmaract: