From de2f58f17067133b601e01bff2aecc77133d8b72 Mon Sep 17 00:00:00 2001 From: x12sa Date: Fri, 16 Jan 2026 13:15:24 +0100 Subject: [PATCH] mods after discussing at beamline --- .../plugins/cSAXS/smaract.py | 396 +++++++++++++----- 1 file changed, 286 insertions(+), 110 deletions(-) diff --git a/csaxs_bec/bec_ipython_client/plugins/cSAXS/smaract.py b/csaxs_bec/bec_ipython_client/plugins/cSAXS/smaract.py index 15ca7fc..553149f 100644 --- a/csaxs_bec/bec_ipython_client/plugins/cSAXS/smaract.py +++ b/csaxs_bec/bec_ipython_client/plugins/cSAXS/smaract.py @@ -51,96 +51,240 @@ class cSAXSInitSmaractStages: def __init__(self, client) -> None: self.client = client - def smaract_reference_stages(self): - #Todo make possible to ref all stages or just those not referenced yet. also init motion only of those newly referenced + + def smaract_reference_stages(self, force=False, devices_to_reference=None): """ - Initialize all Smaract stages by setting speed, finding the reference mark, and sleeping. - """ - for dev_name, axis_letter in self.devices.items(): - ch = self.AXIS_MAP[axis_letter] # Get the channel number for the axis - d = getattr(dev, dev_name) # Access the device instance using `getattr` - - try: - # Set the speed (this is an example, adjust as needed) - print(f"Setting speed for {dev_name} (axis {ch})...") - d.controller.set_closed_loop_move_speed(ch, 1) # Example speed value (1) - - # Find the reference mark for the axis - print(f"Finding reference mark for {dev_name} (axis {ch})...") - d.controller.find_reference_mark(ch, 0, 1000, 1) # Example reference find parameters - - # Sleep after the operation - print(f"Sleeping for 0.1s after initializing {dev_name} (axis {ch})...") - time.sleep(0.1) - - except AttributeError: - print(f"Device {dev_name} does not have a controller or method to initialize.") - except Exception as e: - print(f"Error initializing device {dev_name} (axis {ch}): {e}") - - self.smaract_all_components_to_initial_position() - - def smaract_check_all_referenced(self): - """ - Check if all axes of the devices are referenced. - """ - for dev_name, axis_letter in self.devices.items(): - ch = self.AXIS_MAP[axis_letter] # Get the channel number for the axis - d = getattr(dev, dev_name) # Access the device instance using `getattr` - - try: - # Check if the axis is referenced on the device controller - if not d.controller.axis_is_referenced(ch): - print(f"Device {dev_name} (axis {ch}) is NOT referenced.") - else: - print(f"Device {dev_name} (axis {ch}) is referenced.") - - except AttributeError: - print(f"Device {dev_name} does not have a controller or axis_is_referenced method.") - except Exception as e: - print(f"Error checking device {dev_name} (axis {ch}): {e}") - - def smaract_all_components_to_initial_position( - self, - skip_devices=None, - ): - #todo just move those that are referenced, with option to select individual ones - """ - Move all SmarAct-based components to their configured init_position. + Reference SmarAct stages. Parameters ---------- - skip_devices : iterable of str, optional - Device names to skip (e.g. ["fast_shutter_n1_x"]). + force : bool, optional + If True, re-reference ALL selected stages. + If False (default), only reference stages that are currently NOT referenced. + + devices_to_reference : iterable of str, optional + If provided, only these devices will be considered for referencing. + If None (default), all stages in `self.devices` are candidates. + + Behavior + -------- + - If devices_to_reference is given → restrict referencing to those. + - If force=False → skip devices already referenced. + - If force=True → re-reference selected devices always. + - Only newly referenced devices are passed to + smaract_components_to_initial_position() afterwards. + + + Examples + -------- + + Reference only stages that are NOT referenced yet (default) + csaxs.smaract_reference_stages() + + Force re-reference of all stages + csaxs.smaract_reference_stages(force=True) + + Reference only specific stages + csaxs.smaract_reference_stages( + devices_to_reference=["sl3trxi", "sl3trxo", "xbpm3x"] + ) + + Reference selected stages and force re-referencing + csaxs.smaract_reference_stages( + devices_to_reference=["sl4trxi", "sl4trxo"], + force=True + ) + + Reference a single device + csaxs.smaract_reference_stages( + devices_to_reference="xbimtrx" + ) + + Reference only the selected devices (skip already-referenced ones) + csaxs.smaract_reference_stages( + devices_to_reference=["sl3trxi", "sl4trxo", "sl5trxt"] + ) + + Check referencing status of all stages + csaxs.smaract_check_all_referenced() + + + """ - skip_devices = set(skip_devices or []) + # Normalize selection + if isinstance(devices_to_reference, str): + devices_to_reference = [devices_to_reference] - # First confirmation: intent - if not self.OMNYTools.yesno( - "Do you want to move all SmarAct-based components to their " - "configured initial position?", - "y", - ): - return + selection = ( + set(devices_to_reference) + if devices_to_reference is not None + else set(self.devices.keys()) + ) + + unknown = selection - set(self.devices.keys()) + if unknown: + print(f"Unknown devices requested and ignored: {sorted(unknown)}") + selection = selection.intersection(self.devices.keys()) + + newly_referenced = [] + already_referenced = [] + failed = [] + + print("\nStarting SmarAct referencing...\n") + + for dev_name in sorted(selection): + axis_letter = self.devices[dev_name] + ch = self.AXIS_MAP[axis_letter] + + try: + d = getattr(dev, dev_name) + except AttributeError: + print(f"{dev_name}: device not accessible, skipping.") + failed.append(dev_name) + continue + + try: + is_ref = d.controller.axis_is_referenced(ch) + + # Skip if already referenced and not forcing + if is_ref and not force: + print(f"{dev_name}: already referenced, skipping.") + already_referenced.append(dev_name) + continue + + # Perform referencing + print(f"{dev_name}: referencing axis {ch}...") + d.controller.set_closed_loop_move_speed(ch, 1) + d.controller.find_reference_mark(ch, 0, 1000, 1) + time.sleep(0.1) + + # Verify + if d.controller.axis_is_referenced(ch): + print(f"{dev_name}: successfully referenced.") + newly_referenced.append(dev_name) + else: + print(f"{dev_name}: referencing FAILED.") + failed.append(dev_name) + + except Exception as e: + print(f"Error referencing {dev_name} (axis {ch}): {e}") + failed.append(dev_name) + + # --- Summary --- + print("\n--- Referencing summary ---") + print(f"Newly referenced: {newly_referenced}") + print(f"Already referenced (kept): {already_referenced}") + print(f"Failed: {failed}") + print("-----------------------------------------\n") + + # --- Move newly referenced only --- + if newly_referenced: + print("Moving newly referenced stages to initial positions...") + self.smaract_components_to_initial_position( + devices_to_move=newly_referenced + ) + else: + print("No newly referenced stages.") + + + def smaract_check_all_referenced(self): + """ + Check if all SmarAct axes are referenced. + """ + for dev_name, axis_letter in self.devices.items(): + ch = self.AXIS_MAP[axis_letter] + + try: + d = getattr(dev, dev_name) + if d.controller.axis_is_referenced(ch): + print(f"{dev_name} (axis {ch}) is referenced.") + else: + print(f"{dev_name} (axis {ch}) is NOT referenced.") + except AttributeError: + print(f"{dev_name}: device not accessible or unsupported.") + except Exception as e: + print(f"Error checking {dev_name} (axis {ch}): {e}") + + + def smaract_components_to_initial_position( + self, + devices_to_move=None, + ): + """ + Move selected (or all) SmarAct-based components to their configured init_position. + + Parameters + ---------- + devices_to_move : iterable of str, optional + Specific device names to move (e.g. ["xbpm3x", "sl3trxi"]). + If None, all known SmarAct devices defined in `self.devices` are considered. + + Behavior + -------- + - Only axes that are referenced will be moved. + - Unreferenced axes are skipped with a WARNING; the operation continues. + - Devices missing `init_position` are skipped and listed. + - At the end, a summary warns if some stages could not be moved because they were not referenced. + """ + # Normalize selection + if isinstance(devices_to_move, str): + devices_to_move = [devices_to_move] + selection = set(devices_to_move) if devices_to_move else None planned_moves = [] not_referenced = [] missing_params = [] + inaccessible_devices = [] + unknown_requested = [] + + # If a selection is provided, pre-check for unknown device names + if selection is not None: + known = set(self.devices.keys()) + unknown_requested = sorted(list(selection - known)) + if unknown_requested: + bec_logger.logger.warning( + "[cSAXS] The following requested devices are unknown and will be ignored: " + + ", ".join(unknown_requested) + ) + + # First confirmation: intent + scope_desc = ( + "all SmarAct-based components" + if selection is None + else "the selected SmarAct-based components" + ) + if not self.OMNYTools.yesno( + f"Do you want to move {scope_desc} to the init position as defined in the config file?", + "y", + ): + return # --- Pre-check phase --- for dev_name, axis_letter in self.devices.items(): - - if dev_name in skip_devices: - bec_logger.logger.info( - f"[cSAXS] Skipping device {dev_name} (user request)." - ) + # If a selection is provided, only consider those in the selection + if selection is not None and dev_name not in selection: continue try: d = getattr(dev, dev_name) - ch = self.AXIS_MAP[axis_letter] + except AttributeError: + bec_logger.logger.warning( + f"[cSAXS] Device {dev_name} not accessible, skipping." + ) + inaccessible_devices.append(dev_name) + continue + # Resolve channel + ch = self.AXIS_MAP.get(axis_letter, None) + if ch is None: + bec_logger.logger.warning( + f"[cSAXS] Axis map has no entry for letter '{axis_letter}' " + f"(device {dev_name}); skipping." + ) + continue + + try: # Reference check if not d.controller.axis_is_referenced(ch): not_referenced.append(dev_name) @@ -158,31 +302,38 @@ class cSAXSInitSmaractStages: planned_moves.append((dev_name, init_pos)) - except AttributeError: - bec_logger.logger.warning( - f"[cSAXS] Device {dev_name} not accessible, skipping." - ) - except Exception as exc: bec_logger.logger.error( f"[cSAXS] Error during pre-check for {dev_name}: {exc}" ) - # --- Hard stop conditions --- - if not_referenced: - bec_logger.logger.error( - "[cSAXS] The following devices are NOT referenced:\n" - + ", ".join(not_referenced) - ) - bec_logger.logger.error( - "[cSAXS] Aborting motion. Please reference axes first. \nOr skip the axes by e.g. \nsmaract_all_components_to_initial_position(skip_devices=[\"fast_shutter_n1_x\",\"fast_shutter_o1_x\"])" - ) - return - if not planned_moves: - bec_logger.logger.warning( - "[cSAXS] No valid initial positions found. Nothing to do." - ) + # Nothing to move—still summarize why. + header = "\nNo motions planned. Summary of issues:" + lines = [] + if not_referenced: + lines.append( + " - Not referenced: " + ", ".join(sorted(not_referenced)) + ) + if missing_params: + lines.append( + " - Missing init_position: " + ", ".join(sorted(missing_params)) + ) + if inaccessible_devices: + lines.append( + " - Not accessible: " + ", ".join(sorted(inaccessible_devices)) + ) + if unknown_requested: + lines.append( + " - Unknown requested: " + ", ".join(sorted(unknown_requested)) + ) + if not lines: + lines.append(" - (No eligible devices or nothing to do.)") + + print(header) + for line in lines: + print(line) + bec_logger.logger.warning("[cSAXS] Nothing to do.") return # --- Summary table --- @@ -194,18 +345,28 @@ class cSAXSInitSmaractStages: print(f"{dev_name:<35} {init_pos:>20}") print("-" * 60) + # Notes / diagnostics + if selection is not None: + print("\nNote: Only the following devices were requested to move:") + print(", ".join(sorted(selection))) + if unknown_requested: + print("\nNote: The following requested devices are unknown and were ignored:") + print(", ".join(unknown_requested)) + if not_referenced: + print( + "\nNote: The following devices are NOT referenced and will be skipped:" + ) + print(", ".join(sorted(not_referenced))) if missing_params: print( - "\nNote: The following devices have no init_position defined " - "and will be skipped:" + "\nNote: The following devices have no init_position defined and will be skipped:" ) - print(", ".join(missing_params)) - - if skip_devices: + print(", ".join(sorted(missing_params))) + if inaccessible_devices: print( - "\nNote: The following devices were explicitly skipped:" + "\nNote: The following devices were not accessible and will be skipped:" ) - print(", ".join(sorted(skip_devices))) + print(", ".join(sorted(inaccessible_devices))) # Second confirmation: execution if not self.OMNYTools.yesno( @@ -217,31 +378,46 @@ class cSAXSInitSmaractStages: ) return - # --- Execution phase --- + + # --- Execution phase (SIMULTANEOUS MOTION) --- + # Build a flat argument list: [dev1, pos1, dev2, pos2, ...] + move_args = [] for dev_name, init_pos in planned_moves: try: d = getattr(dev, dev_name) + move_args.append(d) + move_args.append(init_pos) bec_logger.logger.info( - f"[cSAXS] Moving {dev_name} to init_position = {init_pos}" + f"[cSAXS] Preparing move: {dev_name} -> {init_pos}" ) - umv(d, init_pos) - except Exception as exc: bec_logger.logger.error( - f"[cSAXS] Failed to move {dev_name}: {exc}" + f"[cSAXS] Could not access {dev_name}: {exc}" ) + if not move_args: + bec_logger.logger.warning( + "[cSAXS] No valid devices left for simultaneous motion." + ) + return + + # Now trigger simultaneous move + try: + bec_logger.logger.info( + f"[cSAXS] Starting simultaneous motion of {len(planned_moves)} devices." + ) + umv(*move_args) # <-- simultaneous move + except Exception as exc: + bec_logger.logger.error( + f"[cSAXS] Simultaneous motion failed: {exc}" + ) + return + bec_logger.logger.info( - "[cSAXS] SmarAct motion to initial positions completed." + "[cSAXS] Simultaneous SmarAct motion to initial positions completed." ) - # example usage - # csaxs.smaract_all_components_to_initial_position( - # skip_devices=[ - # "fast_shutter_n1_x", - # "fast_shutter_o1_x", - # ] - # ) + class cSAXSSmaract: