diff --git a/csaxs_bec/devices/npoint/npoint.py b/csaxs_bec/devices/npoint/npoint.py index 58f9c50..6148aa7 100644 --- a/csaxs_bec/devices/npoint/npoint.py +++ b/csaxs_bec/devices/npoint/npoint.py @@ -412,10 +412,11 @@ class NPointAxis(Device, PositionerBase): sign=1, socket_cls=SocketIO, tolerance: float = 0.05, + device_manager=None, **kwargs, ): self.controller = NPointController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.sign = sign diff --git a/csaxs_bec/devices/omny/galil/fgalil_ophyd.py b/csaxs_bec/devices/omny/galil/fgalil_ophyd.py index de5a1f0..7a3efac 100644 --- a/csaxs_bec/devices/omny/galil/fgalil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/fgalil_ophyd.py @@ -175,7 +175,7 @@ class FlomniGalilMotor(Device, PositionerBase): **kwargs, ): self.controller = FlomniGalilController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) @@ -212,6 +212,10 @@ class FlomniGalilMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @@ -342,10 +346,10 @@ class FlomniGalilMotor(Device, PositionerBase): Drive an axis to the limit in a specified direction. Args: - direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. + direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. """ self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction) - #now force position read to cache + # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) diff --git a/csaxs_bec/devices/omny/galil/fupr_ophyd.py b/csaxs_bec/devices/omny/galil/fupr_ophyd.py index 23b75db..8c4ac17 100644 --- a/csaxs_bec/devices/omny/galil/fupr_ophyd.py +++ b/csaxs_bec/devices/omny/galil/fupr_ophyd.py @@ -149,7 +149,7 @@ class FuprGalilMotor(Device, PositionerBase): **kwargs, ): self.controller = FuprGalilController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) @@ -185,6 +185,10 @@ class FuprGalilMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) diff --git a/csaxs_bec/devices/omny/galil/galil_ophyd.py b/csaxs_bec/devices/omny/galil/galil_ophyd.py index 62317c4..d0c9fa2 100644 --- a/csaxs_bec/devices/omny/galil/galil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/galil_ophyd.py @@ -59,12 +59,12 @@ class GalilController(Controller): "all_axes_referenced", ] - OKBLUE = '\033[94m' - OKCYAN = '\033[96m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" @threadlocked def socket_put(self, val: str) -> None: @@ -115,29 +115,29 @@ class GalilController(Controller): def axis_is_referenced(self, axis_Id_numeric) -> bool: return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip())) - + def folerr_status(self, axis_Id_numeric) -> bool: return bool(float(self.socket_put_and_receive(f"MG folaxerr[{axis_Id_numeric}]").strip())) def motor_temperature(self, axis_Id_numeric) -> float: - #this is only valid for omny. consider moving to ogalil + # this is only valid for omny. consider moving to ogalil voltage = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip()) voltage2 = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip()) if voltage2 < voltage: voltage = voltage2 # convert from [-10,10]V to [0,300]degC - temperature_degC = round((voltage+10.0) / 20.0 * 300.0, 1) + temperature_degC = round((voltage + 10.0) / 20.0 * 300.0, 1) - #the motors of the parking station have a different offset - #the range is reduced, so if at the limit, we show an extreme value + # the motors of the parking station have a different offset + # the range is reduced, so if at the limit, we show an extreme value if self.sock.port == 8082: - #controller 2 + # controller 2 if axis_Id_numeric == 6: - temperature_degC = round((voltage+10.0-11.4) / 20.0 * 300.0, 1) + temperature_degC = round((voltage + 10.0 - 11.4) / 20.0 * 300.0, 1) if voltage > 9.9: temperature_degC = 300 if axis_Id_numeric == 7: - temperature_degC = round((voltage+.0-12) / 20.0 * 300.0, 1) + temperature_degC = round((voltage + 0.0 - 12) / 20.0 * 300.0, 1) if voltage > 9.9: temperature_degC = 300 return temperature_degC @@ -147,16 +147,15 @@ class GalilController(Controller): Check if all axes are referenced. """ return bool(float(self.socket_put_and_receive("MG allaxref").strip())) - - def _omny_get_microstep_position(self,axis_Id): + + def _omny_get_microstep_position(self, axis_Id): return float(self.socket_put_and_receive(f"MG _TD{axis_Id}").strip()) - - - def _omny_get_reference_limit(self,axis_Id): + + def _omny_get_reference_limit(self, axis_Id): get_axis_no = float(self.socket_put_and_receive(f"MG frmmv").strip()) - if(get_axis_no>0): + if get_axis_no > 0: reference_is_before = float(self.socket_put_and_receive(f"MG _FL{axis_Id}").strip()) - elif(get_axis_no<0): + elif get_axis_no < 0: reference_is_before = float(self.socket_put_and_receive(f"MG _BL{axis_Id}").strip()) else: reference_is_before = 0 @@ -187,7 +186,11 @@ class GalilController(Controller): while self.is_axis_moving(None, axis_Id_numeric): time.sleep(0.01) if verbose: - self.get_device_manager().connector.send_client_info(f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f}", scope="drive axis to limit", show_asap=True) + self.device_manager.connector.send_client_info( + f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f}", + scope="drive axis to limit", + show_asap=True, + ) time.sleep(0.5) # check if we actually hit the limit @@ -201,13 +204,7 @@ class GalilController(Controller): else: print("Limit reached.") - def get_device_manager(self): - for axis in self._axis: - if hasattr(axis, "device_manager") and axis.device_manager: - return axis.device_manager - raise BECConfigError("Could not access the device_manager") - - def find_reference(self, axis_Id_numeric: int, verbose=0, raise_error = 1) -> None: + def find_reference(self, axis_Id_numeric: int, verbose=0, raise_error=1) -> None: """ Find the reference of an axis. @@ -224,7 +221,11 @@ class GalilController(Controller): while self.is_axis_moving(None, axis_Id_numeric): time.sleep(0.1) if verbose: - self.get_device_manager().connector.send_client_info(f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f} reference is before {self._omny_get_reference_limit(axis_Id)}", scope="find axis reference", show_asap=True) + self.device_manager.connector.send_client_info( + f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f} reference is before {self._omny_get_reference_limit(axis_Id)}", + scope="find axis reference", + show_asap=True, + ) time.sleep(0.5) if not self.axis_is_referenced(axis_Id_numeric): @@ -236,7 +237,6 @@ class GalilController(Controller): logger.info(f"Successfully found reference of axis {axis_Id_numeric}.") print(f"Successfully found reference of axis {axis_Id_numeric}.") - def show_running_threads(self) -> None: t = PrettyTable() t.title = f"Threads on {self.sock.host}:{self.sock.port}" @@ -251,7 +251,7 @@ class GalilController(Controller): def is_motor_on(self, axis_Id) -> bool: return not bool(float(self.socket_put_and_receive(f"MG _MO{axis_Id}").strip())) - + def get_motor_limit_switch(self, axis_Id) -> list: """ Get the status of the motor limit switches. @@ -269,14 +269,7 @@ class GalilController(Controller): def describe(self) -> None: t = PrettyTable() t.title = f"{self.__class__.__name__} on {self.sock.host}:{self.sock.port}" - field_names = [ - "Axis", - "Name", - "Referenced", - "Motor On", - "Limits", - "Position", - ] + field_names = ["Axis", "Name", "Referenced", "Motor On", "Limits", "Position"] # in case of OMNY if self.sock.host == "mpc3217.psi.ch": field_names.append("Temperature") @@ -286,7 +279,7 @@ class GalilController(Controller): axis = self._axis[ax] if axis is not None: if self.sock.host == "mpc3217.psi.ch": - #case of omny. possibly consider moving to ogalil + # case of omny. possibly consider moving to ogalil motor_on = self.is_motor_on(axis.axis_Id) if motor_on == True: motor_on = self.WARNING + "ON" + self.ENDC @@ -299,7 +292,7 @@ class GalilController(Controller): else: folerr_status = "False" position = axis.readback.read().get(axis.name).get("value") - position = f'{position:.3f}' + position = f"{position:.3f}" t.add_row( [ f"{axis.axis_Id_numeric}/{axis.axis_Id}", @@ -330,8 +323,6 @@ class GalilController(Controller): self.show_running_threads() self.show_status_other() - - def show_status_other(self) -> None: """ Show additional device-specific status information. @@ -420,7 +411,7 @@ class GalilSetpointSignal(GalilSignalBase): while self.controller.is_thread_active(0): time.sleep(0.1) - #in the case of lamni, consider moving to lgalil + # in the case of lamni, consider moving to lgalil if self.parent.axis_Id_numeric == 2 and self.controller.sock.host == "mpc2680.psi.ch": try: rt = self.parent.device_manager.devices[self.parent.rt] diff --git a/csaxs_bec/devices/omny/galil/lgalil_ophyd.py b/csaxs_bec/devices/omny/galil/lgalil_ophyd.py index 05f12be..4bbd0a9 100644 --- a/csaxs_bec/devices/omny/galil/lgalil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/lgalil_ophyd.py @@ -73,6 +73,7 @@ class LamniGalilController(GalilController): air_off = bool(self.socket_put_and_receive("MG@OUT[13]")) return rt_not_blocked_by_galil and air_off + class LamniGalilReadbackSignal(GalilSignalRO): @retry_once @threadlocked @@ -99,6 +100,7 @@ class LamniGalilReadbackSignal(GalilSignalRO): logger.warning("Failed to set RT value during readback.") return val + class LamniGalilMotor(Device, PositionerBase): USER_ACCESS = ["controller", "drive_axis_to_limit", "find_reference"] readback = Cpt(LamniGalilReadbackSignal, signal_name="readback", kind="hinted") @@ -132,7 +134,7 @@ class LamniGalilMotor(Device, PositionerBase): **kwargs, ): self.controller = LamniGalilController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) @@ -168,6 +170,10 @@ class LamniGalilMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @@ -292,7 +298,7 @@ class LamniGalilMotor(Device, PositionerBase): Find the reference of the axis. """ self.controller.find_reference(self.axis_Id_numeric) - #now force position read to cache + # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) @@ -301,10 +307,10 @@ class LamniGalilMotor(Device, PositionerBase): Drive an axis to the limit in a specified direction. Args: - direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. + direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. """ self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction) - #now force position read to cache + # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) diff --git a/csaxs_bec/devices/omny/galil/ogalil_ophyd.py b/csaxs_bec/devices/omny/galil/ogalil_ophyd.py index 527ae18..9d3b091 100644 --- a/csaxs_bec/devices/omny/galil/ogalil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/ogalil_ophyd.py @@ -46,7 +46,7 @@ class GalilMotorResolution(GalilSignalRO): @threadlocked def _socket_get(self): if self.controller.sock.port == 8083 and self.parent.axis_Id_numeric == 2: - # rotation stage + # rotation stage return 89565.8666667 else: return 51200 @@ -69,37 +69,43 @@ class OMNYGalilReadbackSignal(GalilSignalRO): current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}")) current_pos *= self.parent.sign step_mm = self.parent.motor_resolution.get() - #here we introduce an offset of 25 to the rotation axis - #when setting a position this is taken into account in the controller - #that way we just do tomography from 0 to 180 degrees + # here we introduce an offset of 25 to the rotation axis + # when setting a position this is taken into account in the controller + # that way we just do tomography from 0 to 180 degrees if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083: - return (current_pos / step_mm)+25 + return (current_pos / step_mm) + 25 else: return current_pos / step_mm def read(self): self._metadata["timestamp"] = time.time() val = super().read() - - #if reading rotation stage angle + + # if reading rotation stage angle if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083: current_readback_value = val[self.parent.name]["value"] - #print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.") + # print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.") - if np.fabs((self.previous_rotation_angle-current_readback_value)>10): + if np.fabs((self.previous_rotation_angle - current_readback_value) > 10): message = f"Glitch detected in rotation stage. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}." print(message) - self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True) - + self.parent.device_manager.connector.send_client_info( + message, scope="glitch detector", show_asap=True + ) + val = super().read() current_readback_value = val[self.parent.name]["value"] - if np.fabs((self.previous_rotation_angle-current_readback_value)>10): + if np.fabs((self.previous_rotation_angle - current_readback_value) > 10): message = f"Glitch detected in rotation stage second read. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}. Disabling the controller." print(message) - self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True) - - self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed("allaxref=0") + self.parent.device_manager.connector.send_client_info( + message, scope="glitch detector", show_asap=True + ) + + self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed( + "allaxref=0" + ) self.parent.device_manager.devices["osamroy"].obj.enabled = False return val @@ -108,13 +114,12 @@ class OMNYGalilReadbackSignal(GalilSignalRO): try: rt = self.parent.device_manager.devices["rtx"] if rt.enabled: - rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]-25+54) + rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"] - 25 + 54) except KeyError: - logger.warning("Failed to set RT value during ogalil readback.") + logger.warning("Failed to set RT value during ogalil readback.") return val - class OMNYGalilController(GalilController): USER_ACCESS = [ "describe", @@ -132,18 +137,18 @@ class OMNYGalilController(GalilController): "_ogalil_folerr_not_ignore", ] - OKBLUE = '\033[94m' - OKCYAN = '\033[96m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - - def on(self) -> None: + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + + def on(self, timeout: int = 10) -> None: """Open a new socket connection to the controller""" self._ogalil_switchsocket_switch_all_on() time.sleep(0.3) - super().on() + super().on(timeout=timeout) def _ogalil_switchsocket(self, number: int, switch: bool): # number is socket number ranging from 1 to 4 @@ -185,15 +190,16 @@ class OMNYGalilController(GalilController): self.socket_put_confirmed("IgNoFol=1") self.socket_put_confirmed("XQ#STOP,1") - - def _ogalil_set_axis_to_pos_wo_reference_search(self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign): + def _ogalil_set_axis_to_pos_wo_reference_search( + self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign + ): self.socket_put_confirmed("IgNoFol=1") # pos_mm = pos_encoder / motor_resolution pos_encoder = pos_mm * motor_resolution * motor_sign - #print(motor_resolution) - + # print(motor_resolution) + self.socket_put_confirmed(f"DE{axis_id}={pos_encoder:.0f}") self.socket_put_confirmed(f"DP{axis_id}=_TP{axis_id}*ratio[{axis_id_numeric:.0f}]") @@ -203,7 +209,6 @@ class OMNYGalilController(GalilController): self._ogalil_folerr_not_ignore() - def _ogalil_folerr_not_ignore(self): self.socket_put_confirmed("IgNoFol=0") @@ -240,7 +245,18 @@ class OMNYGalilController(GalilController): class OMNYGalilMotor(Device, PositionerBase): - USER_ACCESS = ["controller", "find_reference", "omny_osamx_to_scan_center", "drive_axis_to_limit", "_ogalil_folerr_reset_and_ignore", "_ogalil_set_axis_to_pos_wo_reference_search", "get_motor_limit_switch", "axis_is_referenced", "get_motor_temperature", "folerr_status"] + USER_ACCESS = [ + "controller", + "find_reference", + "omny_osamx_to_scan_center", + "drive_axis_to_limit", + "_ogalil_folerr_reset_and_ignore", + "_ogalil_set_axis_to_pos_wo_reference_search", + "get_motor_limit_switch", + "axis_is_referenced", + "get_motor_temperature", + "folerr_status", + ] readback = Cpt(OMNYGalilReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint") motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config") @@ -272,7 +288,7 @@ class OMNYGalilMotor(Device, PositionerBase): **kwargs, ): self.controller = OMNYGalilController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) @@ -308,6 +324,10 @@ class OMNYGalilMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @@ -433,8 +453,10 @@ class OMNYGalilMotor(Device, PositionerBase): def _ogalil_set_axis_to_pos_wo_reference_search(self, pos_mm): motor_resolution = self.motor_resolution.get() - self.controller._ogalil_set_axis_to_pos_wo_reference_search(self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign) - #now force position read to cache + self.controller._ogalil_set_axis_to_pos_wo_reference_search( + self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign + ) + # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) @@ -442,9 +464,9 @@ class OMNYGalilMotor(Device, PositionerBase): """ Find the reference of the axis. """ - verbose=1 + verbose = 1 self.controller.find_reference(self.axis_Id_numeric, verbose, raise_error) - #now force position read to cache + # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) @@ -453,10 +475,10 @@ class OMNYGalilMotor(Device, PositionerBase): Drive an axis to the limit in a specified direction. Args: - direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. + direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. """ self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction, verbose=1) - #now force position read to cache + # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) @@ -487,29 +509,31 @@ class OMNYGalilMotor(Device, PositionerBase): def omny_osamx_to_scan_center(self, cenx): if self.controller.sock.port == 8082 and self.axis_Id_numeric == 0: # get last setpoint - osamx = self.device_manager.devices["osamx"] - osamx_current_setpoint = osamx.obj.readback.get() - omny_samx_in = self._get_user_param_safe("osamx","in") - if np.fabs(osamx_current_setpoint-(omny_samx_in+cenx/1000)) > 0.025: - message=f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}." - logger.info(message) + osamx = self.device_manager.devices["osamx"] + osamx_current_setpoint = osamx.obj.readback.get() + omny_samx_in = self._get_user_param_safe("osamx", "in") + if np.fabs(osamx_current_setpoint - (omny_samx_in + cenx / 1000)) > 0.025: + message = ( + f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}." + ) + logger.info(message) - osamx.read_only = False - #osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')") - osamx.set(omny_samx_in+cenx/1000) - time.sleep(0.1) - while(osamx.motor_is_moving.get()): - time.sleep(0.05) - osamx.read_only = True - time.sleep(2) - rt = self.device_manager.devices["rtx"] - if rt.enabled: - rt.obj.controller.laser_tracker_on() - rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength() + osamx.read_only = False + # osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')") + osamx.set(omny_samx_in + cenx / 1000) + time.sleep(0.1) + while osamx.motor_is_moving.get(): + time.sleep(0.05) + osamx.read_only = True + time.sleep(2) + rt = self.device_manager.devices["rtx"] + if rt.enabled: + rt.obj.controller.laser_tracker_on() + rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength() def folerr_status(self) -> bool: return self.controller.folerr_status(self.axis_Id_numeric) - + def stop(self, *, success=False): self.controller.stop_all_axes() return super().stop(success=success) diff --git a/csaxs_bec/devices/omny/galil/sgalil_ophyd.py b/csaxs_bec/devices/omny/galil/sgalil_ophyd.py index d88b191..0dfd352 100644 --- a/csaxs_bec/devices/omny/galil/sgalil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/sgalil_ophyd.py @@ -52,33 +52,12 @@ class GalilController(Controller): "fly_grid_scan", "read_encoder_position", ] + _axes_per_controller = 8 - def __init__( - self, - *, - name="GalilController", - kind=None, - parent=None, - socket=None, - attr_name="", - labels=None, - ): - if not hasattr(self, "_initialized") or not self._initialized: - self._galil_axis_per_controller = 8 - self._axis = [None for axis_num in range(self._galil_axis_per_controller)] - super().__init__( - name=name, - socket=socket, - attr_name=attr_name, - parent=parent, - labels=labels, - kind=kind, - ) - - def on(self, controller_num=0) -> None: + def on(self, timeout: int = 10) -> None: """Open a new socket connection to the controller""" if not self.connected: - self.sock.open() + self.sock.open(timeout=timeout) self.connected = True else: logger.info("The connection has already been established.") @@ -165,11 +144,11 @@ class GalilController(Controller): def show_running_threads(self) -> None: t = PrettyTable() t.title = f"Threads on {self.sock.host}:{self.sock.port}" - t.field_names = [str(ax) for ax in range(self._galil_axis_per_controller)] + t.field_names = [str(ax) for ax in range(self._axes_per_controller)] t.add_row( [ "active" if self.is_thread_active(t) else "inactive" - for t in range(self._galil_axis_per_controller) + for t in range(self._axes_per_controller) ] ) print(t) @@ -199,7 +178,7 @@ class GalilController(Controller): "Limits", "Position", ] - for ax in range(self._galil_axis_per_controller): + for ax in range(self._axes_per_controller): axis = self._axis[ax] if axis is not None: t.add_row( @@ -516,7 +495,9 @@ class SGalilMotor(Device, PositionerBase): ): self.axis_Id = axis_Id self.sign = sign - self.controller = GalilController(socket=socket_cls(host=host, port=port)) + self.controller = GalilController( + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager + ) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.tolerance = kwargs.pop("tolerance", 0.5) self.device_mapping = kwargs.pop("device_mapping", {}) @@ -549,6 +530,10 @@ class SGalilMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) diff --git a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py index 116ae10..a64a468 100644 --- a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py @@ -57,6 +57,7 @@ class RtFlomniController(Controller): socket_cls=None, socket_host=None, socket_port=None, + device_manager=None, attr_name="", parent=None, labels=None, @@ -67,6 +68,7 @@ class RtFlomniController(Controller): socket_cls=socket_cls, socket_host=socket_host, socket_port=socket_port, + device_manager=device_manager, attr_name=attr_name, parent=parent, labels=labels, @@ -126,15 +128,15 @@ class RtFlomniController(Controller): while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1: time.sleep(0.05) - self.get_device_manager().devices.rty.update_user_parameter({"tomo_additional_offsety": 0}) + self.device_manager.devices.rty.update_user_parameter({"tomo_additional_offsety": 0}) self.clear_trajectory_generator() self.laser_tracker_on() # move to 0. FUPR will set the rotation angle during readout - self.get_device_manager().devices.fsamroy.obj.move(0, wait=True) + self.device_manager.devices.fsamroy.obj.move(0, wait=True) - fsamx = self.get_device_manager().devices.fsamx + fsamx = self.device_manager.devices.fsamx fsamx.obj.pid_x_correction = 0 fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") @@ -164,18 +166,18 @@ class RtFlomniController(Controller): self.show_cyclic_error_compensation() self.rt_pid_voltage = self.get_pid_x() - rtx = self.get_device_manager().devices.rtx + rtx = self.device_manager.devices.rtx rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage}) - self.set_device_enabled("fsamx", False) - self.set_device_enabled("fsamy", False) - self.set_device_enabled("foptx", False) - self.set_device_enabled("fopty", False) + self.set_device_read_write("fsamx", False) + self.set_device_read_write("fsamy", False) + self.set_device_read_write("foptx", False) + self.set_device_read_write("fopty", False) def move_samx_to_scan_region(self, fovx: float, cenx: float): time.sleep(0.05) if self.rt_pid_voltage is None: - rtx = self.get_device_manager().devices.rtx + rtx = self.device_manager.devices.rtx self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage") if self.rt_pid_voltage is None: raise RtError( @@ -192,7 +194,7 @@ class RtFlomniController(Controller): break wait_on_exit = True self.socket_put("v0") - fsamx = self.get_device_manager().devices.fsamx + fsamx = self.device_manager.devices.fsamx fsamx.read_only = False fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.007 @@ -223,22 +225,22 @@ class RtFlomniController(Controller): print("Feedback is not running; likely an error in the interferometer.") raise RtError("Feedback is not running; likely an error in the interferometer.") - self.set_device_enabled("fsamx", False) - self.set_device_enabled("fsamy", False) - self.set_device_enabled("foptx", False) - self.set_device_enabled("fopty", False) + self.set_device_read_write("fsamx", False) + self.set_device_read_write("fsamy", False) + self.set_device_read_write("foptx", False) + self.set_device_read_write("fopty", False) def feedback_disable(self): self.clear_trajectory_generator() self.move_to_zero() self.socket_put("l0") - self.set_device_enabled("fsamx", True) - self.set_device_enabled("fsamy", True) - self.set_device_enabled("foptx", True) - self.set_device_enabled("fopty", True) + self.set_device_read_write("fsamx", True) + self.set_device_read_write("fsamy", True) + self.set_device_read_write("foptx", True) + self.set_device_read_write("fopty", True) - fsamx = self.get_device_manager().devices.fsamx + fsamx = self.device_manager.devices.fsamx fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]") print("rt feedback is now disalbed.") @@ -289,12 +291,8 @@ class RtFlomniController(Controller): self.socket_put("T1") time.sleep(0.5) - self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed( - "trackyct=0" - ) - self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed( - "trackzct=0" - ) + self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackyct=0") + self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackzct=0") self.laser_tracker_wait_on_target() logger.info("Laser tracker running!") @@ -341,7 +339,7 @@ class RtFlomniController(Controller): } def laser_tracker_galil_enable(self): - ftrackz_con = self.get_device_manager().devices.ftrackz.obj.controller + ftrackz_con = self.device_manager.devices.ftrackz.obj.controller ftrackz_con.socket_put_confirmed("tracken=1") ftrackz_con.socket_put_confirmed("trackyct=0") ftrackz_con.socket_put_confirmed("trackzct=0") @@ -389,7 +387,7 @@ class RtFlomniController(Controller): self.laser_tracker_wait_on_target() signal = self.read_ssi_interferometer(1) - rtx = self.get_device_manager().devices.rtx + rtx = self.device_manager.devices.rtx min_signal = rtx.user_parameter.get("min_signal") low_signal = rtx.user_parameter.get("low_signal") print(f"low signal: {low_signal}") @@ -478,12 +476,6 @@ class RtFlomniController(Controller): current_position_in_scan = int(float(return_table[2])) return (mode, number_of_positions_planned, current_position_in_scan) - def get_device_manager(self): - for axis in self._axis: - if hasattr(axis, "device_manager") and axis.device_manager: - return axis.device_manager - raise BECConfigError("Could not access the device_manager") - def read_positions_from_sampler(self): # this was for reading after the scan completed number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read @@ -498,7 +490,7 @@ class RtFlomniController(Controller): # if not (mode==2 or mode==3): # error - self.get_device_manager().connector.set( + self.device_manager.connector.set( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=1, metadata=self.readout_metadata @@ -533,7 +525,7 @@ class RtFlomniController(Controller): signals = self._get_signals_from_table(return_table) self.publish_device_data(signals=signals, point_id=int(return_table[0])) - self.get_device_manager().connector.set( + self.device_manager.connector.set( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=0, metadata=self.readout_metadata @@ -547,7 +539,7 @@ class RtFlomniController(Controller): ) def publish_device_data(self, signals, point_id): - self.get_device_manager().connector.set_and_publish( + self.device_manager.connector.set_and_publish( MessageEndpoints.device_read("rt_flomni"), messages.DeviceMessage( signals=signals, metadata={"point_id": point_id, **self.readout_metadata} @@ -658,7 +650,7 @@ class RtFlomniMotor(Device, PositionerBase): self.axis_Id = axis_Id self.sign = sign self.controller = RtFlomniController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.device_manager = device_manager @@ -686,6 +678,10 @@ class RtFlomniMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @@ -813,7 +809,7 @@ class RtFlomniMotor(Device, PositionerBase): if __name__ == "__main__": rtcontroller = RtFlomniController( - socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222 + socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None ) rtcontroller.on() rtcontroller.laser_tracker_on() diff --git a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py index 1d50e6e..2f6c693 100644 --- a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py @@ -71,6 +71,7 @@ class RtLamniController(Controller): socket_cls=None, socket_host=None, socket_port=None, + device_manager=None, attr_name="", parent=None, labels=None, @@ -81,6 +82,7 @@ class RtLamniController(Controller): socket_cls=socket_cls, socket_host=socket_host, socket_port=socket_port, + device_manager=device_manager, attr_name=attr_name, parent=parent, labels=labels, @@ -92,11 +94,11 @@ class RtLamniController(Controller): def feedback_disable(self): self.socket_put("J0") logger.info("LamNI Feedback disabled.") - self.set_device_enabled("lsamx", True) - self.set_device_enabled("lsamy", True) - self.set_device_enabled("loptx", True) - self.set_device_enabled("lopty", True) - self.set_device_enabled("loptz", True) + self.set_device_read_write("lsamx", True) + self.set_device_read_write("lsamy", True) + self.set_device_read_write("loptx", True) + self.set_device_read_write("lopty", True) + self.set_device_read_write("loptz", True) def is_axis_moving(self, axis_Id) -> bool: # this checks that axis is on target @@ -150,25 +152,25 @@ class RtLamniController(Controller): # set these as closed loop target position self.socket_put(f"pa0,{x_curr:.4f}") self.socket_put(f"pa1,{y_curr:.4f}") - self.get_device_manager().devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr) - self.get_device_manager().devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr) + self.device_manager.devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr) + self.device_manager.devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr) self.socket_put("J5") logger.info("LamNI Feedback enabled (without reset).") - self.set_device_enabled("lsamx", False) - self.set_device_enabled("lsamy", False) - self.set_device_enabled("loptx", False) - self.set_device_enabled("lopty", False) - self.set_device_enabled("loptz", False) + self.set_device_read_write("lsamx", False) + self.set_device_read_write("lsamy", False) + self.set_device_read_write("loptx", False) + self.set_device_read_write("lopty", False) + self.set_device_read_write("loptz", False) @threadlocked def feedback_disable_and_even_reset_lamni_angle_interferometer(self): self.socket_put("J6") logger.info("LamNI Feedback disabled including the angular interferometer.") - self.set_device_enabled("lsamx", True) - self.set_device_enabled("lsamy", True) - self.set_device_enabled("loptx", True) - self.set_device_enabled("lopty", True) - self.set_device_enabled("loptz", True) + self.set_device_read_write("lsamx", True) + self.set_device_read_write("lsamy", True) + self.set_device_read_write("loptx", True) + self.set_device_read_write("lopty", True) + self.set_device_read_write("loptz", True) @threadlocked def clear_trajectory_generator(self): @@ -284,7 +286,7 @@ class RtLamniController(Controller): # if not (mode==2 or mode==3): # error - self.get_device_manager().connector.set( + self.device_manager.connector.set( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=1, metadata=self.readout_metadata @@ -319,7 +321,7 @@ class RtLamniController(Controller): signals = self._get_signals_from_table(return_table) self.publish_device_data(signals=signals, point_id=int(return_table[0])) - self.get_device_manager().connector.set( + self.device_manager.connector.set( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=0, metadata=self.readout_metadata @@ -331,7 +333,7 @@ class RtLamniController(Controller): ) def publish_device_data(self, signals, point_id): - self.get_device_manager().connector.set_and_publish( + self.device_manager.connector.set_and_publish( MessageEndpoints.device_read("rt_lamni"), messages.DeviceMessage( signals=signals, metadata={"point_id": point_id, **self.readout_metadata} @@ -366,10 +368,10 @@ class RtLamniController(Controller): ) # we set all three outputs of the traj. gen. although in LamNI case only 0,1 are used self.clear_trajectory_generator() - self.get_device_manager().devices.lsamrot.obj.move(0, wait=True) + self.device_manager.devices.lsamrot.obj.move(0, wait=True) galil_controller_rt_status = ( - self.get_device_manager().devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled() + self.device_manager.devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled() ) if galil_controller_rt_status == 0: @@ -382,16 +384,16 @@ class RtLamniController(Controller): time.sleep(0.03) - lsamx_user_params = self.get_device_manager().devices.lsamx.user_parameter + lsamx_user_params = self.device_manager.devices.lsamx.user_parameter if lsamx_user_params is None or lsamx_user_params.get("center") is None: raise RuntimeError("lsamx center is not defined") - lsamy_user_params = self.get_device_manager().devices.lsamy.user_parameter + lsamy_user_params = self.device_manager.devices.lsamy.user_parameter if lsamy_user_params is None or lsamy_user_params.get("center") is None: raise RuntimeError("lsamy center is not defined") lsamx_center = lsamx_user_params.get("center") lsamy_center = lsamy_user_params.get("center") - self.get_device_manager().devices.lsamx.obj.move(lsamx_center, wait=True) - self.get_device_manager().devices.lsamy.obj.move(lsamy_center, wait=True) + self.device_manager.devices.lsamx.obj.move(lsamx_center, wait=True) + self.device_manager.devices.lsamy.obj.move(lsamy_center, wait=True) self.socket_put("J1") _waitforfeedbackctr = 0 @@ -405,11 +407,11 @@ class RtLamniController(Controller): (self.socket_put_and_receive("J2")).split(",")[0] ) - self.set_device_enabled("lsamx", False) - self.set_device_enabled("lsamy", False) - self.set_device_enabled("loptx", False) - self.set_device_enabled("lopty", False) - self.set_device_enabled("loptz", False) + self.set_device_read_write("lsamx", False) + self.set_device_read_write("lsamy", False) + self.set_device_read_write("loptx", False) + self.set_device_read_write("lopty", False) + self.set_device_read_write("loptz", False) if interferometer_feedback_not_running == 1: logger.error( @@ -559,7 +561,7 @@ class RtLamniMotor(Device, PositionerBase): self.axis_Id = axis_Id self.sign = sign self.controller = RtLamniController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.device_manager = device_manager @@ -586,6 +588,10 @@ class RtLamniMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) diff --git a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py index 5160f9f..fd10993 100644 --- a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py @@ -1,8 +1,8 @@ +import builtins +import socket import threading import time from typing import List -import builtins -import socket import numpy as np from bec_lib import bec_logger, messages @@ -34,12 +34,15 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import ( logger = bec_logger.logger + class RtOMNY_mirror_switchbox_Error(Exception): pass + class RtOMNY_Error(Exception): pass + class RtOMNYController(Controller): _axes_per_controller = 3 red = "\x1b[91m" @@ -87,6 +90,7 @@ class RtOMNYController(Controller): socket_cls=None, socket_host=None, socket_port=None, + device_manager=None, attr_name="", parent=None, labels=None, @@ -97,6 +101,7 @@ class RtOMNYController(Controller): socket_cls=socket_cls, socket_host=socket_host, socket_port=socket_port, + device_manager=device_manager, attr_name=attr_name, parent=parent, labels=labels, @@ -234,7 +239,7 @@ class RtOMNYController(Controller): "opt_amplitude1_neg": 3000, "opt_amplitude2_pos": 3000, "opt_amplitude2_neg": 3000, - } + }, } # def is_axis_moving(self, axis_Id) -> bool: @@ -261,42 +266,60 @@ class RtOMNYController(Controller): threading.Thread(target=send_positions, args=(self, positions), daemon=True).start() - def get_mirror_parameters(self,channel): + def get_mirror_parameters(self, channel): return self.mirror_parameters[channel] - def laser_tracker_check_and_wait_for_signalstrength(self): - self.get_device_manager().connector.send_client_info("Checking laser tracker...", scope="", show_asap=True) + self.device_manager.connector.send_client_info( + "Checking laser tracker...", scope="", show_asap=True + ) if not self.laser_tracker_check_enabled(): - print("laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled.") + print( + "laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled." + ) return - #first check on target + # first check on target self.laser_tracker_wait_on_target() - #when on target, check interferometer signal - signal = self._omny_interferometer_get_signalsample("ssi_4",0.1) - rtx = self.get_device_manager().devices.rtx + # when on target, check interferometer signal + signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1) + rtx = self.device_manager.devices.rtx min_signal = rtx.user_parameter.get("min_signal") low_signal = rtx.user_parameter.get("low_signal") wait_counter = 0 - while signal < min_signal and wait_counter<10: - self.get_device_manager().connector.send_client_info(f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True) + while signal < min_signal and wait_counter < 10: + self.device_manager.connector.send_client_info( + f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...", + scope="laser_tracker_check_and_wait_for_signalstrength", + show_asap=True, + ) - wait_counter+=1 + wait_counter += 1 time.sleep(0.2) - signal = self._omny_interferometer_get_signalsample("ssi_4",0.1) + signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1) if signal < low_signal: - self.get_device_manager().connector.send_client_info(f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True) + self.device_manager.connector.send_client_info( + f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m", + scope="laser_tracker_check_and_wait_for_signalstrength", + show_asap=True, + ) - self.omny_interferometer_align_tracking() - self.get_device_manager().connector.send_client_info("Checking laser tracker completed.", scope="", show_asap=True) + self.omny_interferometer_align_tracking() + self.device_manager.connector.send_client_info( + "Checking laser tracker completed.", scope="", show_asap=True + ) def omny_interferometer_align_tracking(self): - mirror_channel=6 - signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"]) + mirror_channel = 6 + signal = self._omny_interferometer_get_signalsample( + self.mirror_parameters[mirror_channel]["opt_signalchannel"], + self.mirror_parameters[mirror_channel]["opt_averaging_time"], + ) if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: - print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.") + print( + f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed." + ) else: self._omny_interferometer_switch_channel(mirror_channel) time.sleep(0.1) @@ -307,16 +330,19 @@ class RtOMNYController(Controller): self._omny_interferometer_switch_alloff() self.show_signal_strength_interferometer() - mirror_channel=-1 - - + mirror_channel = -1 def omny_interferometer_align_incoupling_angle(self): - mirror_channel=1 - signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"]) + mirror_channel = 1 + signal = self._omny_interferometer_get_signalsample( + self.mirror_parameters[mirror_channel]["opt_signalchannel"], + self.mirror_parameters[mirror_channel]["opt_averaging_time"], + ) if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: - print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.") + print( + f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed." + ) else: self._omny_interferometer_switch_channel(mirror_channel) time.sleep(0.1) @@ -327,19 +353,18 @@ class RtOMNYController(Controller): self._omny_interferometer_switch_alloff() self.show_signal_strength_interferometer() - mirror_channel=-1 - + mirror_channel = -1 def _omny_interferometer_openloop_steps(self, channel, steps, amplitude): - if channel not in range(3,5): + if channel not in range(3, 5): raise RtOMNY_Error(f"invalid channel number {channel}.") if amplitude > 4090: amplitude = 4090 elif amplitude < 10: amplitude = 10 - - oshield = self.get_device_manager().devices.oshield + + oshield = self.device_manager.devices.oshield oshield.obj.controller.move_open_loop_steps( channel, steps, amplitude=amplitude, frequency=500 @@ -351,7 +376,7 @@ class RtOMNYController(Controller): def _omny_interferometer_optimize(self, mirror_channel, channel): if mirror_channel == -1: raise RtOMNY_Error("no mirror channel selected") - #mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror + # mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror if channel == 3: steps_pos = self.mirror_parameters[mirror_channel]["opt_steps1_pos"] steps_neg = self.mirror_parameters[mirror_channel]["opt_steps1_neg"] @@ -365,67 +390,80 @@ class RtOMNYController(Controller): else: raise RtOMNY_Error(f"invalid channel number {channel}.") - previous_signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"]) + previous_signal = self._omny_interferometer_get_signalsample( + self.mirror_parameters[mirror_channel]["opt_signalchannel"], + self.mirror_parameters[mirror_channel]["opt_averaging_time"], + ) - min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"] + min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"] if previous_signal < min_begin: - #raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.") + # raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.") print(f"\rMinimum signal for auto alignment {min_begin} not reached.") return elif previous_signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: - print(f"\rInterferometer signal of axis is good") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.") - return + print( + f"\rInterferometer signal of axis is good" + ) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.") + return else: direction = 1 - cycle_counter=0 - cycle_max=20 - reversal_counter=0 - reversal_max=4 - self.mirror_amplitutde_increase=0 + cycle_counter = 0 + cycle_max = 20 + reversal_counter = 0 + reversal_max = 4 + self.mirror_amplitutde_increase = 0 - current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"]) - max=current_sample + current_sample = self._omny_interferometer_get_signalsample( + self.mirror_parameters[mirror_channel]["opt_signalchannel"], + self.mirror_parameters[mirror_channel]["opt_averaging_time"], + ) + max = current_sample - while current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"] and cycle_counter0: + if direction > 0: self._omny_interferometer_openloop_steps(channel, steps_pos, opt_amplitude_pos) verbose_str = f"channel {channel}, steps {steps_pos}" else: self._omny_interferometer_openloop_steps(channel, -steps_neg, opt_amplitude_neg) verbose_str = f"auto action {channel}, steps {-steps_pos}" - #print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") + # print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") - current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"]) + current_sample = self._omny_interferometer_get_signalsample( + self.mirror_parameters[mirror_channel]["opt_signalchannel"], + self.mirror_parameters[mirror_channel]["opt_averaging_time"], + ) opt_mirrorname = self.mirror_parameters[mirror_channel]["opt_mirrorname"] info_str = f"\rAuto aligning Channel {mirror_channel}, {opt_mirrorname}, Current signal: {current_sample:.0f}" - message=info_str +" (q)uit \r" - self.get_device_manager().connector.send_client_info(message, scope="_omny_interferometer_optimize", show_asap=True) - - - - if previous_signal>current_sample: - if direction<0: - steps_pos=int(steps_pos/2) - steps_neg=int(steps_neg/2) - if steps_pos<1: - steps_pos=1 - if steps_neg<1: - steps_neg=1 - direction=direction*(-1) - reversal_counter+=1 - previous_signal=current_sample - cycle_counter+=1 - - print(f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") - - + message = info_str + " (q)uit \r" + self.device_manager.connector.send_client_info( + message, scope="_omny_interferometer_optimize", show_asap=True + ) + if previous_signal > current_sample: + if direction < 0: + steps_pos = int(steps_pos / 2) + steps_neg = int(steps_neg / 2) + if steps_pos < 1: + steps_pos = 1 + if steps_neg < 1: + steps_neg = 1 + direction = direction * (-1) + reversal_counter += 1 + previous_signal = current_sample + cycle_counter += 1 + print( + f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r" + ) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") def move_to_zero(self): self.socket_put("pa0,0") @@ -457,7 +495,7 @@ class RtOMNYController(Controller): if ret == 1: return True return False - + def feedback_is_running(self) -> bool: self.feedback_get_status_and_ssi() interferometer_feedback_not_running = int(self.ssi["feedback_error"]) @@ -466,7 +504,9 @@ class RtOMNYController(Controller): return True def feedback_enable_with_reset(self): - self.get_device_manager().connector.send_client_info(f"Enabling the feedback...", scope="", show_asap=True) + self.device_manager.connector.send_client_info( + f"Enabling the feedback...", scope="", show_asap=True + ) self.socket_put("J0") # disable feedback time.sleep(0.01) @@ -485,14 +525,16 @@ class RtOMNYController(Controller): self.laser_tracker_on() time.sleep(0.01) - osamroy = self.get_device_manager().devices.osamroy + osamroy = self.device_manager.devices.osamroy # the following read will also upate the angle in rt during readout readback = osamroy.obj.readback.get() - if (np.fabs(readback) > 0.1): - self.get_device_manager().connector.send_client_info(f"Rotating to zero", scope="feedback enable", show_asap=True) + if np.fabs(readback) > 0.1: + self.device_manager.connector.send_client_info( + f"Rotating to zero", scope="feedback enable", show_asap=True + ) osamroy.obj.move(0, wait=True) - osamx = self.get_device_manager().devices.osamx + osamx = self.device_manager.devices.osamx osamx_in = osamx.user_parameter.get("in") if not np.isclose(osamx.obj.readback.get(), osamx_in, atol=0.01): @@ -514,16 +556,15 @@ class RtOMNYController(Controller): time.sleep(1.5) - self.set_device_enabled("osamx", False) - self.set_device_enabled("osamy", False) - self.set_device_enabled("ofzpx", False) - self.set_device_enabled("ofzpy", False) - self.set_device_enabled("oosax", False) - self.set_device_enabled("oosax", False) + self.set_device_read_write("osamx", False) + self.set_device_read_write("osamy", False) + self.set_device_read_write("ofzpx", False) + self.set_device_read_write("ofzpy", False) + self.set_device_read_write("oosax", False) + self.set_device_read_write("oosax", False) print("Feedback is running.") - @threadlocked def clear_trajectory_generator(self): self.socket_put("sc") @@ -534,16 +575,15 @@ class RtOMNYController(Controller): self.move_to_zero() self.socket_put("J0") - self.set_device_enabled("osamx", True) - self.set_device_enabled("osamy", True) - self.set_device_enabled("ofzpx", True) - self.set_device_enabled("ofzpy", True) - self.set_device_enabled("oosax", True) - self.set_device_enabled("oosax", True) + self.set_device_read_write("osamx", True) + self.set_device_read_write("osamy", True) + self.set_device_read_write("ofzpx", True) + self.set_device_read_write("ofzpy", True) + self.set_device_read_write("oosax", True) + self.set_device_read_write("oosax", True) print("rt feedback is now disabled.") - def set_rotation_angle(self, val: float) -> None: self.socket_put(f"a{val/180*np.pi}") @@ -578,12 +618,13 @@ class RtOMNYController(Controller): "enabled_z": bool(tracker_values[10]), } - def laser_tracker_on(self): - #update variables and enable if not yet active + # update variables and enable if not yet active if not self.laser_tracker_check_enabled(): logger.info("Enabling the laser tracker. Please wait...") - self.get_device_manager().connector.send_client_info(f"Enabling the laser tracker. Please wait...", scope="", show_asap=True) + self.device_manager.connector.send_client_info( + f"Enabling the laser tracker. Please wait...", scope="", show_asap=True + ) tracker_intensity = self.tracker_info["tracker_intensity"] if ( @@ -598,18 +639,13 @@ class RtOMNYController(Controller): self.socket_put("T1") time.sleep(0.5) - self.get_device_manager().devices.otracky.obj.controller.socket_put_confirmed( - "trackyct=0" - ) - self.get_device_manager().devices.otracky.obj.controller.socket_put_confirmed( - "trackzct=0" - ) + self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackyct=0") + self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackzct=0") self.laser_tracker_wait_on_target() logger.info("Laser tracker running!") print("Laser tracker running!") - def laser_tracker_check_enabled(self) -> bool: self.laser_update_tracker_info() if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]: @@ -628,11 +664,10 @@ class RtOMNYController(Controller): return True return False - def laser_tracker_wait_on_target(self): max_repeat = 15 count = 0 - while not self.laser_tracker_check_on_target() and count dict: self.average_stdeviations_x_st_fzp += float(return_table[16]) self.average_stdeviations_y_st_fzp += float(return_table[18]) @@ -831,7 +862,6 @@ class RtOMNYController(Controller): "stdev_x_st_fzp": {"value": float(return_table[16])}, "average_y_st_fzp": {"value": float(return_table[17])}, "stdev_y_st_fzp": {"value": float(return_table[18])}, - "average_stdeviations_x_st_fzp": { "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) }, @@ -840,7 +870,7 @@ class RtOMNYController(Controller): }, } return signals - + @threadlocked def start_scan(self): if not self.feedback_is_running(): @@ -862,7 +892,6 @@ class RtOMNYController(Controller): # start a point-by-point scan (for cont scan in flomni it would be "sa") self.socket_put_and_receive("sd") - @retry_once @threadlocked def get_scan_status(self): @@ -881,13 +910,6 @@ class RtOMNYController(Controller): current_position_in_scan = int(float(return_table[2])) return (mode, number_of_positions_planned, current_position_in_scan) - - def get_device_manager(self): - for axis in self._axis: - if hasattr(axis, "device_manager") and axis.device_manager: - return axis.device_manager - raise BECConfigError("Could not access the device_manager") - def read_positions_from_sampler(self): # this was for reading after the scan completed number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read @@ -901,7 +923,7 @@ class RtOMNYController(Controller): # if not (mode==2 or mode==3): # error - self.get_device_manager().connector.set( + self.device_manager.connector.set( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=1, metadata=self.readout_metadata @@ -936,7 +958,7 @@ class RtOMNYController(Controller): signals = self._get_signals_from_table(return_table) self.publish_device_data(signals=signals, point_id=int(return_table[0])) - self.get_device_manager().connector.set( + self.device_manager.connector.set( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=0, metadata=self.readout_metadata @@ -949,15 +971,16 @@ class RtOMNYController(Controller): f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}." ) - self.get_device_manager().connector.send_client_info( + self.device_manager.connector.send_client_info( "OMNY statistics: Average of all standard deviations [nm]: x" f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}.", - scope="", show_asap=True) - + scope="", + show_asap=True, + ) def publish_device_data(self, signals, point_id): - self.get_device_manager().connector.set_and_publish( + self.device_manager.connector.set_and_publish( MessageEndpoints.device_read("rt_omny"), messages.DeviceMessage( signals=signals, metadata={"point_id": point_id, **self.readout_metadata} @@ -1068,7 +1091,7 @@ class RtOMNYMotor(Device, PositionerBase): self.axis_Id = axis_Id self.sign = sign self.controller = RtOMNYController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.device_manager = device_manager @@ -1096,6 +1119,10 @@ class RtOMNYMotor(Device, PositionerBase): self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + def wait_for_connection(self, timeout: int = 30, **kwargs) -> None: + """Wait for the device to be connected.""" + self.controller.on(timeout=timeout) + @property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @@ -1182,7 +1209,6 @@ class RtOMNYMotor(Device, PositionerBase): return status - @property def axis_Id(self): return self._axis_Id_alpha @@ -1227,7 +1253,7 @@ class RtOMNYMotor(Device, PositionerBase): if __name__ == "__main__": rtcontroller = RtOMNYController( - socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222 + socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None ) rtcontroller.on() rtcontroller.laser_tracker_on() diff --git a/csaxs_bec/devices/smaract/smaract_controller.py b/csaxs_bec/devices/smaract/smaract_controller.py index 772b678..8b2e513 100644 --- a/csaxs_bec/devices/smaract/smaract_controller.py +++ b/csaxs_bec/devices/smaract/smaract_controller.py @@ -93,6 +93,7 @@ class SmaractController(Controller): socket_cls=None, socket_host=None, socket_port=None, + device_manager=None, attr_name="", labels=None, ): @@ -102,6 +103,7 @@ class SmaractController(Controller): socket_cls=socket_cls, socket_host=socket_host, socket_port=socket_port, + device_manager=device_manager, attr_name=attr_name, parent=parent, labels=labels, diff --git a/csaxs_bec/devices/smaract/smaract_ophyd.py b/csaxs_bec/devices/smaract/smaract_ophyd.py index ff088ac..0e810c2 100644 --- a/csaxs_bec/devices/smaract/smaract_ophyd.py +++ b/csaxs_bec/devices/smaract/smaract_ophyd.py @@ -123,10 +123,11 @@ class SmaractMotor(Device, PositionerBase): limits=None, sign=1, socket_cls=SocketIO, + device_manager=None, **kwargs, ): self.controller = SmaractController( - socket_cls=socket_cls, socket_host=host, socket_port=port + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.sign = sign diff --git a/tests/tests_devices/test_fupr_ophyd.py b/tests/tests_devices/test_fupr_ophyd.py index 615e088..2683a14 100644 --- a/tests/tests_devices/test_fupr_ophyd.py +++ b/tests/tests_devices/test_fupr_ophyd.py @@ -7,10 +7,15 @@ from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGal @pytest.fixture -def fsamroy(): +def fsamroy(dm_with_devices): FuprGalilController._reset_controller() fsamroy_motor = FuprGalilMotor( - "A", name="fsamroy", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + "A", + name="fsamroy", + host="mpc2680.psi.ch", + port=8081, + socket_cls=SocketMock, + device_manager=dm_with_devices, ) fsamroy_motor.controller.on() assert isinstance(fsamroy_motor.controller, FuprGalilController) diff --git a/tests/tests_devices/test_galil.py b/tests/tests_devices/test_galil.py index b76854f..6ae437b 100644 --- a/tests/tests_devices/test_galil.py +++ b/tests/tests_devices/test_galil.py @@ -8,10 +8,15 @@ from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, Lamn @pytest.fixture(scope="function") -def leyey(): +def leyey(dm_with_devices): LamniGalilController._reset_controller() leyey_motor = LamniGalilMotor( - "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + "H", + name="leyey", + host="mpc2680.psi.ch", + port=8081, + socket_cls=SocketMock, + device_manager=dm_with_devices, ) leyey_motor.controller.on() yield leyey_motor @@ -20,10 +25,15 @@ def leyey(): @pytest.fixture(scope="function") -def leyex(): +def leyex(dm_with_devices): LamniGalilController._reset_controller() leyex_motor = LamniGalilMotor( - "A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + "A", + name="leyey", + host="mpc2680.psi.ch", + port=8081, + socket_cls=SocketMock, + device_manager=dm_with_devices, ) leyex_motor.controller.on() yield leyex_motor diff --git a/tests/tests_devices/test_galil_flomni.py b/tests/tests_devices/test_galil_flomni.py index fe24030..f3dcbc2 100644 --- a/tests/tests_devices/test_galil_flomni.py +++ b/tests/tests_devices/test_galil_flomni.py @@ -7,10 +7,15 @@ from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, Flo @pytest.fixture(scope="function") -def leyey(): +def leyey(dm_with_devices): FlomniGalilController._reset_controller() leyey_motor = FlomniGalilMotor( - "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + "H", + name="leyey", + host="mpc2680.psi.ch", + port=8081, + socket_cls=SocketMock, + device_manager=dm_with_devices, ) leyey_motor.controller.on() yield leyey_motor @@ -19,10 +24,15 @@ def leyey(): @pytest.fixture(scope="function") -def leyex(): +def leyex(dm_with_devices): FlomniGalilController._reset_controller() leyex_motor = FlomniGalilMotor( - "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + "H", + name="leyey", + host="mpc2680.psi.ch", + port=8081, + socket_cls=SocketMock, + device_manager=dm_with_devices, ) leyex_motor.controller.on() yield leyex_motor @@ -157,11 +167,16 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages ], ) def test_fosaz_light_curtain_is_triggered( - axis_Id, socket_put_messages, socket_get_messages, triggered + axis_Id, socket_put_messages, socket_get_messages, triggered, dm_with_devices ): """test that the light curtain is triggered""" fosaz = FlomniGalilMotor( - axis_Id, name="fosaz", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock + axis_Id, + name="fosaz", + host="mpc2680.psi.ch", + port=8081, + socket_cls=SocketMock, + device_manager=dm_with_devices, ) fosaz.controller.on() fosaz.controller.sock.flush_buffer() diff --git a/tests/tests_devices/test_npoint_piezo.py b/tests/tests_devices/test_npoint_piezo.py index a8157d7..85e6e54 100644 --- a/tests/tests_devices/test_npoint_piezo.py +++ b/tests/tests_devices/test_npoint_piezo.py @@ -16,7 +16,10 @@ def controller(): """ with mock.patch("ophyd_devices.utils.socket.SocketIO") as socket_cls: controller = NPointController( - socket_cls=socket_cls, socket_host="localhost", socket_port=1234 + socket_cls=socket_cls, + socket_host="localhost", + socket_port=1234, + device_manager=mock.MagicMock(), ) controller.on() controller.sock.reset_mock() @@ -25,13 +28,18 @@ def controller(): @pytest.fixture -def npointx(): +def npointx(dm_with_devices): """ Fixture to create a NPointAxis object. """ controller = mock.MagicMock() npointx = NPointAxis( - axis_Id="A", name="npointx", host="localhost", port=1234, socket_cls=controller + axis_Id="A", + name="npointx", + host="localhost", + port=1234, + socket_cls=controller, + device_manager=dm_with_devices, ) npointx.controller.on() npointx.controller.sock.reset_mock() @@ -107,13 +115,18 @@ def test_axis_get_in(npointx, axis, msg_in, msg_out): npointx.controller.sock.put.assert_called_once_with(msg_in) -def test_axis_out_of_range(controller): +def test_axis_out_of_range(dm_with_devices): """ Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID. """ with pytest.raises(ValueError): npointx = NPointAxis( - axis_Id="G", name="npointx", host="localhost", port=1234, socket_cls=mock.MagicMock() + axis_Id="G", + name="npointx", + host="localhost", + port=1234, + socket_cls=mock.MagicMock(), + device_manager=dm_with_devices, ) diff --git a/tests/tests_devices/test_rt_flomni.py b/tests/tests_devices/test_rt_flomni.py index e6f7ae8..811d657 100644 --- a/tests/tests_devices/test_rt_flomni.py +++ b/tests/tests_devices/test_rt_flomni.py @@ -11,26 +11,29 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import RtError def rt_flomni(): RtFlomniController._reset_controller() rt_flomni = RtFlomniController( - name="rt_flomni", socket_cls=SocketMock, socket_host="localhost", socket_port=8081 + name="rt_flomni", + socket_cls=SocketMock, + socket_host="localhost", + socket_port=8081, + device_manager=mock.MagicMock(), ) - with mock.patch.object(rt_flomni, "get_device_manager"): - with mock.patch.object(rt_flomni, "sock"): - rtx = mock.MagicMock(spec=RtFlomniMotor) - rtx.name = "rtx" - rtx.axis_Id = "A" - rtx.axis_Id_numeric = 0 - rty = mock.MagicMock(spec=RtFlomniMotor) - rty.name = "rty" - rty.axis_Id = "B" - rty.axis_Id_numeric = 1 - rtz = mock.MagicMock(spec=RtFlomniMotor) - rtz.name = "rtz" - rtz.axis_Id = "C" - rtz.axis_Id_numeric = 2 - rt_flomni.set_axis(axis=rtx, axis_nr=0) - rt_flomni.set_axis(axis=rty, axis_nr=1) - rt_flomni.set_axis(axis=rtz, axis_nr=2) - yield rt_flomni + with mock.patch.object(rt_flomni, "sock"): + rtx = mock.MagicMock(spec=RtFlomniMotor) + rtx.name = "rtx" + rtx.axis_Id = "A" + rtx.axis_Id_numeric = 0 + rty = mock.MagicMock(spec=RtFlomniMotor) + rty.name = "rty" + rty.axis_Id = "B" + rty.axis_Id_numeric = 1 + rtz = mock.MagicMock(spec=RtFlomniMotor) + rtz.name = "rtz" + rtz.axis_Id = "C" + rtz.axis_Id_numeric = 2 + rt_flomni.set_axis(axis=rtx, axis_nr=0) + rt_flomni.set_axis(axis=rty, axis_nr=1) + rt_flomni.set_axis(axis=rtz, axis_nr=2) + yield rt_flomni RtFlomniController._reset_controller() @@ -52,7 +55,7 @@ def test_rt_flomni_feedback_is_running(rt_flomni, return_value, is_running): def test_feedback_enable_with_reset(rt_flomni): - device_manager = rt_flomni.get_device_manager() + device_manager = rt_flomni.device_manager device_manager.devices.fsamx.user_parameter.get.return_value = 0.05 device_manager.devices.fsamx.obj.readback.get.return_value = 0.05 @@ -68,7 +71,7 @@ def test_feedback_enable_with_reset(rt_flomni): def test_move_samx_to_scan_region(rt_flomni): - device_manager = rt_flomni.get_device_manager() + device_manager = rt_flomni.device_manager device_manager.devices.rtx.user_parameter.get.return_value = 1 rt_flomni.move_samx_to_scan_region(20, 2) assert mock.call(b"v0\n") not in rt_flomni.sock.put.mock_calls @@ -76,16 +79,16 @@ def test_move_samx_to_scan_region(rt_flomni): def test_feedback_enable_without_reset(rt_flomni): - with mock.patch.object(rt_flomni, "set_device_enabled") as set_device_enabled: + with mock.patch.object(rt_flomni, "set_device_read_write") as set_device_read_write: with mock.patch.object(rt_flomni, "feedback_is_running", return_value=True): with mock.patch.object(rt_flomni, "laser_tracker_on") as laser_tracker_on: rt_flomni.feedback_enable_without_reset() laser_tracker_on.assert_called_once() assert mock.call(b"l3\n") in rt_flomni.sock.put.mock_calls - assert mock.call("fsamx", False) in set_device_enabled.mock_calls - assert mock.call("fsamy", False) in set_device_enabled.mock_calls - assert mock.call("foptx", False) in set_device_enabled.mock_calls - assert mock.call("fopty", False) in set_device_enabled.mock_calls + assert mock.call("fsamx", False) in set_device_read_write.mock_calls + assert mock.call("fsamy", False) in set_device_read_write.mock_calls + assert mock.call("foptx", False) in set_device_read_write.mock_calls + assert mock.call("fopty", False) in set_device_read_write.mock_calls def test_feedback_enable_without_reset_raises(rt_flomni): diff --git a/tests/tests_devices/test_smaract.py b/tests/tests_devices/test_smaract.py index f4ef4de..3e77ca0 100644 --- a/tests/tests_devices/test_smaract.py +++ b/tests/tests_devices/test_smaract.py @@ -10,19 +10,27 @@ from csaxs_bec.devices.smaract.smaract_ophyd import SmaractMotor @pytest.fixture -def controller(): +def controller(dm_with_devices): SmaractController._reset_controller() - controller = SmaractController(socket_cls=SocketMock, socket_host="dummy", socket_port=123) + controller = SmaractController( + socket_cls=SocketMock, socket_host="dummy", socket_port=123, device_manager=dm_with_devices + ) controller.on() controller.sock.flush_buffer() yield controller @pytest.fixture -def lsmarA(): +def lsmarA(dm_with_devices): SmaractController._reset_controller() motor_a = SmaractMotor( - "A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock + "A", + name="lsmarA", + host="mpc2680.psi.ch", + port=8085, + sign=1, + socket_cls=SocketMock, + device_manager=dm_with_devices, ) motor_a.controller.on() motor_a.controller.sock.flush_buffer()