diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py index 707f44d..72e3bcb 100644 --- a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py @@ -28,6 +28,7 @@ if builtins.__dict__.get("bec") is not None: dev = builtins.__dict__.get("dev") scans = builtins.__dict__.get("scans") + def umv(*args): return scans.umv(*args, relative=False) @@ -35,12 +36,15 @@ def umv(*args): class FlomniToolsError(Exception): pass + class FlomniInitError(Exception): pass + class FlomniError(Exception): pass + # class FlomniTools: # def yesno(self, message: str, default="none", autoconfirm=0) -> bool: # if autoconfirm and default == "y": @@ -84,13 +88,16 @@ class FlomniInitStagesMixin: else: return - sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage") sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip()) if not np.isclose(sensor_voltage, sensor_voltage_target, 0.25): - print(f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}") - print("Verify that the value is acceptable and update config file. Reload config and start again.") + print( + f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}" + ) + print( + "Verify that the value is acceptable and update config file. Reload config and start again." + ) return print("Starting to drive ftransy to +y limit") @@ -118,8 +125,9 @@ class FlomniInitStagesMixin: dev.feyex.limits = [-30, -1] print("done") - - if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"): + if self.OMNYTools.yesno( + "Init of foptz. Can the stage move to the upstream limit without collision?" + ): print("OK. continue.") else: return @@ -173,7 +181,9 @@ class FlomniInitStagesMixin: dev.fsamy.limits = [2, 3.1] print("done") - if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"): + if self.OMNYTools.yesno( + "Init of tracking stages. Did you remove the outer laser flight tubes?" + ): print("OK. continue.") else: print("Stopping.") @@ -206,7 +216,9 @@ class FlomniInitStagesMixin: print("done") print("Initializing UPR stage.") - if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"): + if self.OMNYTools.yesno( + "To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?" + ): print("OK. continue.") else: print("Stopping.") @@ -246,7 +258,9 @@ class FlomniInitStagesMixin: dev.fsamroy.limits = [-5, 365] print("done") - if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"): + if self.OMNYTools.yesno( + "Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!" + ): print("OK. continue.") else: print("Stopping.") @@ -425,7 +439,7 @@ class FlomniSampleTransferMixin: def ftransfer_flomni_stage_in(self): time.sleep(1) sample_in_position = dev.flomni_samples.is_sample_slot_used(0) - #bool(float(dev.flomni_samples.sample_placed.sample0.get())) + # bool(float(dev.flomni_samples.sample_placed.sample0.get())) if not sample_in_position: raise FlomniError("There is no sample in the sample stage. Aborting.") self.reset_correction() @@ -440,7 +454,10 @@ class FlomniSampleTransferMixin: print("Moving X-ray eye in.") - if self.OMNYTools.yesno("Please confirm that this is ok with the flight tube. This check is to be removed after commissioning", "n"): + if self.OMNYTools.yesno( + "Please confirm that this is ok with the flight tube. This check is to be removed after commissioning", + "n", + ): print("OK. continue.") else: print("Stopping.") @@ -451,7 +468,6 @@ class FlomniSampleTransferMixin: self.foptics_out() self.xrayeye_update_frame() - def laser_tracker_show_all(self): dev.rtx.controller.laser_tracker_show_all() @@ -554,7 +570,6 @@ class FlomniSampleTransferMixin: self.flomnigui_show_cameras() - self.ftransfer_gripper_move(position) self.ftransfer_controller_enable_mount_mode() @@ -594,7 +609,7 @@ class FlomniSampleTransferMixin: self.check_sensor_connected() sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() - #bool(float(dev.flomni_samples.sample_in_gripper.get())) + # bool(float(dev.flomni_samples.sample_in_gripper.get())) if not sample_in_gripper: raise FlomniError("The gripper does not carry a sample.") @@ -646,7 +661,7 @@ class FlomniSampleTransferMixin: def ftransfer_sample_change(self, new_sample_position: int): self.check_tray_in() -# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get() + # sample_in_gripper = dev.flomni_samples.sample_in_gripper.get() sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() if sample_in_gripper: raise FlomniError("There is already a sample in the gripper. Aborting.") @@ -654,28 +669,30 @@ class FlomniSampleTransferMixin: self.check_position_is_valid(new_sample_position) if new_sample_position == 0: - raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.") + raise FlomniError( + "The new sample to place cannot be the sample in the sample stage. Aborting." + ) -# sample_placed = getattr( -# dev.flomni_samples.sample_placed, f"sample{new_sample_position}" -# ).get() + # sample_placed = getattr( + # dev.flomni_samples.sample_placed, f"sample{new_sample_position}" + # ).get() sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position) if not sample_placed: raise FlomniError( f"There is currently no sample in position [{new_sample_position}]. Aborting." ) -# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get() + # sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get() sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0) if sample_in_sample_stage: # find a new home for the sample... empty_slots = [] -# for name, val in dev.flomni_samples.read().items(): -# if "flomni_samples_sample_placed_sample" not in name: -# continue -# if val.get("value") == 0: -# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1])) - for j in range(1,20): + # for name, val in dev.flomni_samples.read().items(): + # if "flomni_samples_sample_placed_sample" not in name: + # continue + # if val.get("value") == 0: + # empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1])) + for j in range(1, 20): if not dev.flomni_samples.is_sample_slot_used(j): empty_slots.append(j) if not empty_slots: @@ -769,7 +786,7 @@ class FlomniSampleTransferMixin: def ftransfer_gripper_open(self): sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() - #dev.flomni_samples.sample_in_gripper.get() + # dev.flomni_samples.sample_in_gripper.get() if sample_in_gripper: raise FlomniError( "Cannot open gripper. There is still a sample in the gripper! Aborting." @@ -790,7 +807,10 @@ class FlomniSampleTransferMixin: fsamx_pos = dev.fsamx.readback.get() if position == 0 and fsamx_pos > -160: - if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"): + if self.OMNYTools.yesno( + "May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", + "y", + ): print("OK. continue.") self.ftransfer_flomni_stage_out() else: @@ -946,7 +966,7 @@ class FlomniSampleTransferMixin: class FlomniAlignmentMixin: import csaxs_bec - import os + import os from pathlib import Path # Ensure this is a Path object, not a string @@ -956,10 +976,13 @@ class FlomniAlignmentMixin: # Build the absolute path correctly default_correction_file = ( - csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel + csaxs_bec_basepath.parent + / "bec_ipython_client" + / "plugins" + / "flomni" + / default_correction_file_rel ).resolve() - def reset_correction(self, use_default_correction=True): """ Reset the correction to the default values. @@ -1031,12 +1054,12 @@ class FlomniAlignmentMixin: else: params = dev.omny_xray_gui.fit_params_x.get() - #amplitude - tomo_alignment_fit[0][0] = params['SineModel_0_amplitude'] - #phase - tomo_alignment_fit[0][1] = params['SineModel_0_shift'] - #offset - tomo_alignment_fit[0][2] = params['LinearModel_1_intercept'] + # amplitude + tomo_alignment_fit[0][0] = params["SineModel_0_amplitude"] + # phase + tomo_alignment_fit[0][1] = params["SineModel_0_shift"] + # offset + tomo_alignment_fit[0][2] = params["LinearModel_1_intercept"] print("applying vertical default values from mirror calibration, not from fit!") tomo_alignment_fit[1][0] = 0 tomo_alignment_fit[1][1] = 0 @@ -1045,7 +1068,6 @@ class FlomniAlignmentMixin: tomo_alignment_fit[1][4] = 0 print("New alignment parameters loaded based on Xray eye alignment GUI:") - print( f"X Amplitude {tomo_alignment_fit[0][0]}, " f"X Phase {tomo_alignment_fit[0][1]}, " @@ -1192,7 +1214,7 @@ class Flomni( FlomniAlignmentMixin, FlomniOpticsMixin, cSAXSBeamlineChecks, - flomniGuiTools + flomniGuiTools, ): def __init__(self, client): super().__init__() @@ -1224,7 +1246,10 @@ class Flomni( def start_x_ray_eye_alignment(self, keep_shutter_open=False): - if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"): + if self.OMNYTools.yesno( + "Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", + "y", + ): self.align = XrayEyeAlign(self.client, self) try: self.align.align(keep_shutter_open) @@ -1236,7 +1261,7 @@ class Flomni( umv(dev.fsamx, fsamx_in) raise exc - def xrayeye_update_frame(self,keep_shutter_open=False): + def xrayeye_update_frame(self, keep_shutter_open=False): self.align.update_frame(keep_shutter_open) def xrayeye_alignment_start(self, keep_shutter_open=False): @@ -1476,13 +1501,12 @@ class Flomni( dev = builtins.__dict__.get("dev") bec = builtins.__dict__.get("bec") - self.feye_out() tags = ["BEC_alignment_tomo", self.sample_name] self.write_to_scilog( f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags ) - + start_angle = 0 angle_end = start_angle + 180 @@ -1504,15 +1528,17 @@ class Flomni( error_caught = True else: raise exc - #todo here was if blchk success, then setting to success true + # todo here was if blchk success, then setting to success true successful = True - + end_scan_number = bec.queue.next_scan_number for scan_nr in range(start_scan_number, end_scan_number): self._write_tomo_scan_number(scan_nr, angle, 0) umv(dev.fsamroy, 0) - self.OMNYTools.printgreenbold("\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit.") + self.OMNYTools.printgreenbold( + "\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit." + ) def _write_subtomo_to_scilog(self, subtomo_number): dev = builtins.__dict__.get("dev") @@ -1567,20 +1593,19 @@ class Flomni( elif subtomo_number == 8: start_angle = self.tomo_angle_stepsize / 8.0 * 7 - # _tomo_shift_angles (potential global variable) _tomo_shift_angles = 0 # compute number of projections start = start_angle + _tomo_shift_angles - if subtomo_number % 2: # odd = forward + if subtomo_number % 2: # odd = forward max_allowed_angle = 180.05 + self.tomo_angle_stepsize proposed_end = start + 180 angle_end = min(proposed_end, max_allowed_angle) span = angle_end - start - else: # even = reverse + else: # even = reverse min_allowed_angle = 0 proposed_end = start - 180 angle_end = max(proposed_end, min_allowed_angle) @@ -1589,38 +1614,23 @@ class Flomni( # number of projections needed to maintain step size N = int(span / self.tomo_angle_stepsize) + 1 - angles = np.linspace( - start, - angle_end, - num=N, - endpoint=True, - ) + angles = np.linspace(start, angle_end, num=N, endpoint=True) - if subtomo_number % 2: # odd subtomos → forward direction + if subtomo_number % 2: # odd subtomos → forward direction # clamp end angle to max allowed max_allowed_angle = 180.05 + self.tomo_angle_stepsize proposed_end = start + 180 angle_end = min(proposed_end, max_allowed_angle) - angles = np.linspace( - start, - angle_end, - num=N, - endpoint=True, - ) + angles = np.linspace(start, angle_end, num=N, endpoint=True) - else: # even subtomos → reverse direction + else: # even subtomos → reverse direction # go FROM start_angle down toward 0 min_allowed_angle = 0 proposed_end = start - 180 angle_end = max(proposed_end, min_allowed_angle) - angles = np.linspace( - start, - angle_end, - num=N, - endpoint=True, - ) + angles = np.linspace(start, angle_end, num=N, endpoint=True) for i, angle in enumerate(angles): @@ -1636,9 +1646,9 @@ class Flomni( self._subtomo_offset = 0 else: - if subtomo_number % 2: # odd = forward direction + if subtomo_number % 2: # odd = forward direction self._subtomo_offset = round(sa / step) - else: # even = reverse direction + else: # even = reverse direction self._subtomo_offset = round((180 - sa) / step) # progress index must always increase @@ -1647,14 +1657,15 @@ class Flomni( # existing progress fields self.progress["subtomo_total_projections"] = int(180 / self.tomo_angle_stepsize) - self.progress["projection"] = (subtomo_number - 1) * self.progress["subtomo_total_projections"] + self.progress["subtomo_projection"] + self.progress["projection"] = (subtomo_number - 1) * self.progress[ + "subtomo_total_projections" + ] + self.progress["subtomo_projection"] self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8 self.progress["angle"] = angle # finally do the scan at this angle self._tomo_scan_at_angle(angle, subtomo_number) - def _tomo_scan_at_angle(self, angle, subtomo_number): successful = False error_caught = False @@ -1662,7 +1673,7 @@ class Flomni( print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}") self._print_progress() while not successful: - #self.bl_chk._bl_chk_start() + # self.bl_chk._bl_chk_start() if not self.special_angles: self._current_special_angles = [] if self._current_special_angles: @@ -1697,9 +1708,11 @@ class Flomni( """start a tomo scan""" if not self._check_eye_out_and_optics_in(): - print("Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT.") + print( + "Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT." + ) if self.OMNYTools.yesno("Shall I continue?", "n"): - print("OK") + print("OK") else: print("Stopping.") return @@ -1738,7 +1751,7 @@ class Flomni( for ii in range(subtomo_start, 9): self.sub_tomo_scan(ii, start_angle=start_angle) start_angle = None - + elif self.tomo_type == 2: # Golden ratio tomography previous_subtomo_number = -1 @@ -1834,9 +1847,9 @@ class Flomni( else: raise FlomniError("undefined tomo type") - self.progress['projection'] = self.progress['total_projections'] + self.progress["projection"] = self.progress["total_projections"] self.progress["subtomo_projection"] = self.progress["subtomo_total_projections"] - self._print_progress() + self._print_progress() self.OMNYTools.printgreenbold("Tomoscan finished") def _print_progress(self): @@ -1930,9 +1943,7 @@ class Flomni( ) def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: - tomo_scan_numbers_file = os.path.expanduser( - "~/data/raw/logs/tomography_scannumbers.txt" - ) + tomo_scan_numbers_file = os.path.expanduser("~/data/raw/logs/tomography_scannumbers.txt") with open(tomo_scan_numbers_file, "a+") as out_file: # pylint: disable=undefined-variable out_file.write( @@ -1943,7 +1954,6 @@ class Flomni( dev.rtx.controller.laser_tracker_check_signalstrength() - scans = builtins.__dict__.get("scans") # additional_correction = self.align.compute_additional_correction(angle) @@ -2033,7 +2043,6 @@ class Flomni( ) print(f"\nSample name: {self.sample_name}\n") - if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"): print("... excellent!") else: @@ -2181,4 +2190,4 @@ if __name__ == "__main__": builtins.__dict__["bec"] = bec builtins.__dict__["umv"] = umv flomni = Flomni(bec) - flomni.start_x_ray_eye_alignment() \ No newline at end of file + flomni.start_x_ray_eye_alignment()