diff --git a/csaxs_bec/bec_ipython_client/plugins/omny/omny_alignment_mixin.py b/csaxs_bec/bec_ipython_client/plugins/omny/omny_alignment_mixin.py index 9e851f7..2c5eaf5 100644 --- a/csaxs_bec/bec_ipython_client/plugins/omny/omny_alignment_mixin.py +++ b/csaxs_bec/bec_ipython_client/plugins/omny/omny_alignment_mixin.py @@ -7,6 +7,8 @@ from rich.console import Console from rich.table import Table from typeguard import typechecked +from bec_lib import bec_logger +logger = bec_logger.logger class OMNYAlignmentError(Exception): diff --git a/csaxs_bec/device_configs/omny_config.yaml b/csaxs_bec/device_configs/omny_config.yaml index 5a4de93..42e1f30 100644 --- a/csaxs_bec/device_configs/omny_config.yaml +++ b/csaxs_bec/device_configs/omny_config.yaml @@ -72,7 +72,6 @@ rtx: userParameter: low_signal: 11000 min_signal: 10000 - rt_pid_voltage: -0.06219 rty: description: OMNY rt deviceClass: csaxs_bec.devices.omny.rt.rt_omny_ophyd.RtOMNYMotor diff --git a/csaxs_bec/scans/__init__.py b/csaxs_bec/scans/__init__.py index abcd325..b49c8e9 100644 --- a/csaxs_bec/scans/__init__.py +++ b/csaxs_bec/scans/__init__.py @@ -1,4 +1,5 @@ from .flomni_fermat_scan import FlomniFermatScan +from .omny_fermat_scan import OMNYFermatScan from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter from .owis_grid import OwisGrid from .sgalil_grid import SgalilGrid diff --git a/csaxs_bec/scans/omny_fermat_scan.py b/csaxs_bec/scans/omny_fermat_scan.py new file mode 100644 index 0000000..09c8f5f --- /dev/null +++ b/csaxs_bec/scans/omny_fermat_scan.py @@ -0,0 +1,340 @@ +""" +SCAN PLUGINS + +All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden +but they are executed in a specific order: + +- self.initialize # initialize the class if needed +- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed) +- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions: + - self._calculate_positions # calculate the positions + - self._set_positions_offset # apply the previously retrieved scan position shift (if needed) + - self._check_limits # tests to ensure the limits won't be reached +- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names +- self.stage # stage all devices for the upcoming acquisiton +- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan +- self.scan_core # run a loop over all position + - self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments +- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish +- self.unstage # unstage all devices that have been staged before +- self.cleanup # send a close scan message and perform additional cleanups if needed +""" + +import time + +import numpy as np +from bec_lib import bec_logger, messages +from bec_lib.endpoints import MessageEndpoints +from bec_server.scan_server.errors import ScanAbortion +from bec_server.scan_server.scans import SyncFlyScanBase + +logger = bec_logger.logger + + +class OMNYFermatScan(SyncFlyScanBase): + scan_name = "omny_fermat_scan" + scan_report_hint = "table" + scan_type = "fly" + required_kwargs = ["fov_size", "exp_time", "step", "angle"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__( + self, + fovx: float, + fovy: float, + cenx: float, + ceny: float, + exp_time: float, + step: float, + zshift: float, + angle: float = None, + corridor_size: float = 3, + parameter: dict = None, + **kwargs, + ): + """ + An OMNY scan following Fermat's spiral. + + Args: + fovx(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 200 um + fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um + cenx(float) [mm]: center position in x. + ceny(float) [mm]: center position in y. + exp_time(float) [s]: exposure time + step(float) [um]: stepsize + zshift(float) [um]: shift in z + angle(float) [deg]: rotation angle (will rotate first) + corridor_size(float) [um]: corridor size for the corridor optimization. Default 3 um + + Returns: + + Examples: + >>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01) + """ + + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.fovx = fovx + self.fovy = fovy + self.cenx = cenx + self.ceny = ceny + self.exp_time = exp_time + self.step = step + self.zshift = zshift + self.angle = angle + self.optim_trajectory = "corridor" + self.optim_trajectory_corridor = corridor_size + if self.fovy > 100: + raise ScanAbortion("The FOV in y must be smaller than 100 um.") + if self.fovx > 200: + raise ScanAbortion("The FOV in x must be smaller than 200 um.") + if self.zshift > 100: + logger.warning("The zshift is larger than 100 um. It will be limited to 100 um.") + self.zshift = 100 + if self.zshift < -100: + logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.") + self.zshift = -100 + + def initialize(self): + self.scan_motors = [] + self.update_readout_priority() + + def _optimize_trajectory(self): + self.positions = self.optimize_corridor( + self.positions, corridor_size=self.optim_trajectory_corridor + ) + + @property + def monitor_sync(self): + return "rt_omny" + + def reverse_trajectory(self): + """ + Reverse the trajectory. Every other scan should be reversed to + shorten the movement time. In order to keep the last state, even if the + server is restarted, the state is stored in a global variable in redis. + """ + producer = self.device_manager.producer + msg = producer.get(MessageEndpoints.global_vars("reverse_omny_trajectory")) + if msg: + val = msg.content.get("value", False) + else: + val = False + producer.set( + MessageEndpoints.global_vars("reverse_omny_trajectory"), + messages.VariableMessage(value=(not val)), + ) + return val + + def prepare_positions(self): + self._calculate_positions() + self._optimize_trajectory() + flip_axes = self.reverse_trajectory() + if flip_axes: + self.positions = np.flipud(self.positions) + + self.num_pos = len(self.positions) + self._check_min_positions() + + def _check_min_positions(self): + if self.num_pos < 20: + raise ScanAbortion( + f"The number of positions must exceed 20. Currently: {self.num_pos}." + ) + + def _prepare_setup(self): + yield from self.stubs.send_rpc_and_wait("rtx", "controller.clear_trajectory_generator") + yield from self.omny_rotation(self.angle) + + yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1]) + + def _prepare_setup_part2(self): + yield from self.stubs.wait(wait_type="move", device="osamroy", wait_group="omny_rotation") + yield from self.stubs.set( + device="rtx", value=self.positions[0][0], wait_group="prepare_setup_part2" + ) + yield from self.stubs.set( + device="rtz", value=self.positions[0][2], wait_group="prepare_setup_part2" + ) + yield from self.stubs.send_rpc_and_wait("rtx", "controller.laser_tracker_on") + yield from self.stubs.wait( + wait_type="move", device=["rtx", "rtz"], wait_group="prepare_setup_part2" + ) + yield from self._transfer_positions_to_omny() + yield from self.omny_osamx_to_scan_center() + + def _get_user_param_safe(self, device, var): + param = dev[device].user_parameter + if not param or param.get(var) is None: + raise ValueError(f"Device {device} has no user parameter definition for {var}.") + return param.get(var) + + def omny_osamx_to_scan_center(self): + # get last setpoint + osamroy_current_setpoint = yield from self.stubs.send_rpc_and_wait( + "osamroy", "user_setpoint.get" + ) + omny_samx_in = self._get_user_param_safe("osamx","in") + if np.fabs(osamroy_current_setpoint-(omny_samx_in+self.cenx/1000)) > 0.025: + logger.info("Moving osamx to scan center") + self.set_device_enabled("osamx", True) + dev.osamx.controller.socket_put_confirmed("axspeed[0]=100") + yield from self.stubs.set(device="osamx", value=self.cenx/1000, wait_group="osamx_mv") + yield from self.stubs.wait(wait_type="move", device="osamx", wait_group="osamx_mv") + self.set_device_enabled("osamx", False) + time.sleep(4) + yield from self.stubs.send_rpc_and_wait("rtx", "controller.laser_tracker_on") + +# Todo: auto align tracking at beginning of each scan. currently this is implemented in the client only +# this part is probably just ok with wait on target, which should also re align the tracking in OMNY! + + + + + def omny_rotation(self, angle): + # get last setpoint (cannot be based on pos get because they will deviate slightly) + osamroy_current_setpoint = yield from self.stubs.send_rpc_and_wait( + "osamroy", "user_setpoint.get" + ) + if angle == osamroy_current_setpoint: + logger.info("No rotation required") + else: + logger.info("Rotating to requested angle") + yield from self.stubs.scan_report_instruction( + { + "readback": { + "RID": self.metadata["RID"], + "devices": ["osamroy"], + "start": [osamroy_current_setpoint], + "end": [angle], + } + } + ) + yield from self.stubs.set(device="osamroy", value=angle, wait_group="omny_rotation") + + def _transfer_positions_to_omny(self): + yield from self.stubs.send_rpc_and_wait( + "rtx", "controller.add_pos_to_scan", self.positions.tolist() + ) + + def _calculate_positions(self): + self.positions = self.get_omny_fermat_spiral_pos( + -np.abs(self.fovx / 2), + np.abs(self.fovx / 2), + -np.abs(self.fovy / 2), + np.abs(self.fovy / 2), + step=self.step, + spiral_type=0, + center=False, + ) + + def get_omny_fermat_spiral_pos( + self, m1_start, m1_stop, m2_start, m2_stop, step=1, spiral_type=0, center=False + ): + """ + Calculate positions for a Fermat spiral scan. + + Args: + m1_start(float): start position in m1 + m1_stop(float): stop position in m1 + m2_start(float): start position in m2 + m2_stop(float): stop position in m2 + step(float): stepsize + spiral_type(int): 0 for traditional Fermat spiral + center(bool): whether to include the center position + + Returns: + positions(array): positions + """ + positions = [] + phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi + + start = int(not center) + + length_axis1 = np.abs(m1_stop - m1_start) + length_axis2 = np.abs(m2_stop - m2_start) + n_max = int(length_axis1 * length_axis2 * 3.2 / step / step) + + z_pos = self.zshift + + for ii in range(start, n_max): + radius = step * 0.57 * np.sqrt(ii) + # FOV is restructed below at check pos in range + if abs(radius * np.sin(ii * phi)) > length_axis1 / 2: + continue + if abs(radius * np.cos(ii * phi)) > length_axis2 / 2: + continue + x = radius * np.sin(ii * phi) + y = radius * np.cos(ii * phi) + positions.append([x + self.cenx, y + self.ceny, z_pos]) + left_lower_corner = [ + min(m1_start, m1_stop) + self.cenx, + min(m2_start, m2_stop) + self.ceny, + z_pos, + ] + right_upper_corner = [ + max(m1_start, m1_stop) + self.cenx, + max(m2_start, m2_stop) + self.ceny, + z_pos, + ] + positions.append(left_lower_corner) + positions.append(right_upper_corner) + return np.array(positions) + + def scan_core(self): + # use a device message to receive the scan number and + # scan ID before sending the message to the device server + yield from self.stubs.kickoff(device="rtx") + while True: + yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary") + status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan")) + if status: + status_id = status.content.get("status", 1) + request_id = status.metadata.get("RID") + if status_id == 0 and self.metadata.get("RID") == request_id: + break + if status_id == 2 and self.metadata.get("RID") == request_id: + raise ScanAbortion( + "An error occured during the omny readout:" + f" {status.metadata.get('error')}" + ) + + time.sleep(1) + logger.debug("reading monitors") + # yield from self.device_rpc("rtx", "controller.kickoff") + + def return_to_start(self): + """return to the start position""" + # in omny, we need to move to the start position of the next scan + if isinstance(self.positions, np.ndarray) and len(self.positions[-1]) == 3: + yield from self.stubs.set( + device="rtx", value=self.positions[-1][0], wait_group="scan_motor" + ) + yield from self.stubs.set( + device="rty", value=self.positions[-1][1], wait_group="scan_motor" + ) + yield from self.stubs.set( + device="rtz", value=self.positions[-1][2], wait_group="scan_motor" + ) + + yield from self.stubs.wait( + wait_type="move", device=["rtx", "rty", "rtz"], wait_group="scan_motor" + ) + return + + logger.warning("No positions found to return to start") + + def run(self): + self.initialize() + yield from self.read_scan_motors() + self.prepare_positions() + yield from self._prepare_setup() + yield from self.open_scan() + yield from self.stage() + yield from self.run_baseline_reading() + yield from self._prepare_setup_part2() + yield from self.scan_core() + yield from self.finalize() + yield from self.unstage() + yield from self.cleanup()