From ac234edd56b7b0f29d8f284c704cdc1848ea0b38 Mon Sep 17 00:00:00 2001 From: Klaus Wakonig Date: Tue, 6 Sep 2022 16:22:05 +0200 Subject: [PATCH] enforced black --- .gitlab-ci.yml | 6 ++++ ophyd_devices/galil/galil_ophyd.py | 11 ++++--- ophyd_devices/npoint/npoint_ophyd.py | 9 ++---- ophyd_devices/rt_lamni/rt_lamni_ophyd.py | 36 ++++++++++++++------- ophyd_devices/sls_devices/sls_devices.py | 3 +- ophyd_devices/smaract/smaract_controller.py | 9 ++---- ophyd_devices/smaract/smaract_ophyd.py | 2 +- ophyd_devices/utils/controller.py | 2 +- tests/test_npoint_piezo.py | 5 +-- 9 files changed, 45 insertions(+), 38 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index adfdcca..7d40575 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -8,7 +8,13 @@ before_script: - pip install -e . # different stages in the pipeline stages: + - Formatter - Test +formatter: + stage: Formatter + script: + - pip install black + - black --check --diff --color --line-length=100 ./ pytest: stage: Test script: diff --git a/ophyd_devices/galil/galil_ophyd.py b/ophyd_devices/galil/galil_ophyd.py index 4c3db22..e1a5714 100644 --- a/ophyd_devices/galil/galil_ophyd.py +++ b/ophyd_devices/galil/galil_ophyd.py @@ -15,6 +15,7 @@ from bec_utils import bec_logger logger = bec_logger.logger + class GalilCommunicationError(Exception): pass @@ -48,7 +49,7 @@ class GalilController(Controller): "galil_show_all", "socket_put_and_receive", "socket_put_confirmed", - "lgalil_is_air_off_and_orchestra_enabled" + "lgalil_is_air_off_and_orchestra_enabled", ] def __init__( @@ -151,9 +152,9 @@ class GalilController(Controller): return self.socket_put_and_receive(f"XQ#STOP,1") def lgalil_is_air_off_and_orchestra_enabled(self) -> bool: - rt_not_blocked_by_galil=bool(self.socket_put_and_receive(f"MG@OUT[9]")) - air_off=bool(self.socket_put_and_receive(f"MG@OUT[13]")) - return (rt_not_blocked_by_galil and air_off) + rt_not_blocked_by_galil = bool(self.socket_put_and_receive(f"MG@OUT[9]")) + air_off = bool(self.socket_put_and_receive(f"MG@OUT[13]")) + return rt_not_blocked_by_galil and air_off def axis_is_referenced(self, axis_Id_numeric) -> bool: return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip())) @@ -291,7 +292,7 @@ class GalilSetpointSignal(GalilSignalBase): if axes_referenced: while self.controller.is_thread_active(0): time.sleep(0.1) - + if self.parent.axis_Id_numeric == 2: angle_status = self.parent.device_manager.devices[ self.parent.rt diff --git a/ophyd_devices/npoint/npoint_ophyd.py b/ophyd_devices/npoint/npoint_ophyd.py index 87618a9..c6eb6b5 100644 --- a/ophyd_devices/npoint/npoint_ophyd.py +++ b/ophyd_devices/npoint/npoint_ophyd.py @@ -22,6 +22,7 @@ def channel_checked(fcn): return wrapper + class NPointController(SingletonController): NUM_CHANNELS = 3 _read_single_loc_bit = "A0" @@ -84,9 +85,7 @@ class NPointController(SingletonController): if not self.socket.is_open: self.socket.open() try: - self.socket.connect( - self._server_and_port_name[0], self._server_and_port_name[1] - ) + self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1]) except socket.timeout: raise TimeoutError( f"Failed to connect to the specified server and port {self._server_and_port_name}." @@ -96,9 +95,7 @@ class NPointController(SingletonController): self.socket.close() time.sleep(0.5) self.socket.open() - self.socket.connect( - self._server_and_port_name[0], self._server_and_port_name[1] - ) + self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1]) self.connected = True @threadlocked diff --git a/ophyd_devices/rt_lamni/rt_lamni_ophyd.py b/ophyd_devices/rt_lamni/rt_lamni_ophyd.py index d216945..1030888 100644 --- a/ophyd_devices/rt_lamni/rt_lamni_ophyd.py +++ b/ophyd_devices/rt_lamni/rt_lamni_ophyd.py @@ -155,7 +155,7 @@ class RtLamniController(Controller): @threadlocked def stop_all_axes(self): self.socket_put("sc") - + @threadlocked def feedback_disable(self): self.socket_put("J0") @@ -303,8 +303,8 @@ class RtLamniController(Controller): time.sleep(0.1) self.start_readout() - def _get_signals_from_table(self, return_table) ->dict: - self.average_stdeviations_x_st_fzp += float(return_table[5]) + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[5]) self.average_stdeviations_y_st_fzp += float(return_table[8]) self.average_lamni_angle += float(return_table[19]) signals = { @@ -326,9 +326,13 @@ class RtLamniController(Controller): "stdev_cap5": {"value": float(return_table[18])}, "average_angle_interf_ST": {"value": float(return_table[19])}, "stdev_angle_interf_ST": {"value": float(return_table[20])}, - "average_stdeviations_x_st_fzp": {"value": self.average_stdeviations_x_st_fzp/(int(return_table[0])+1)}, - "average_stdeviations_y_st_fzp": {"value": self.average_stdeviations_y_st_fzp/(int(return_table[0])+1)}, - "average_lamni_angle": {"value": self.average_lamni_angle/(int(return_table[0])+1)}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, } return signals @@ -347,7 +351,12 @@ class RtLamniController(Controller): # if not (mode==2 or mode==3): # error - self.get_device_manager().producer.set_and_publish(MessageEndpoints.device_status("rt_scan"), BECMessage.DeviceStatusMessage(device="rt_scan", status=1, metadata=self.readout_metadata).dumps()) + self.get_device_manager().producer.set_and_publish( + MessageEndpoints.device_status("rt_scan"), + BECMessage.DeviceStatusMessage( + device="rt_scan", status=1, metadata=self.readout_metadata + ).dumps(), + ) # while scan is running while mode > 0: # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") @@ -361,9 +370,8 @@ class RtLamniController(Controller): read_counter = read_counter + 1 - signals = self._get_signals_from_table(return_table) - + self.publish_device_data(signals=signals, pointID=int(return_table[0])) time.sleep(0.05) @@ -375,11 +383,15 @@ class RtLamniController(Controller): # logger.info(f"{return_table}") read_counter = read_counter + 1 - signals = self._get_signals_from_table(return_table) + signals = self._get_signals_from_table(return_table) self.publish_device_data(signals=signals, pointID=int(return_table[0])) - self.get_device_manager().producer.set_and_publish(MessageEndpoints.device_status("rt_scan"), BECMessage.DeviceStatusMessage(device="rt_scan", status=0, metadata=self.readout_metadata).dumps()) - + self.get_device_manager().producer.set_and_publish( + MessageEndpoints.device_status("rt_scan"), + BECMessage.DeviceStatusMessage( + device="rt_scan", status=0, metadata=self.readout_metadata + ).dumps(), + ) logger.info( f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}." diff --git a/ophyd_devices/sls_devices/sls_devices.py b/ophyd_devices/sls_devices/sls_devices.py index 498eba1..930884e 100644 --- a/ophyd_devices/sls_devices/sls_devices.py +++ b/ophyd_devices/sls_devices/sls_devices.py @@ -14,11 +14,10 @@ class SLSOperatorMessages(Device): message5 = Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-MSG5", auto_monitor=True) message_date5 = Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-DATE5", auto_monitor=True) + # class SLSOperatorMessages(Device): # pass # for i in range(5): # setattr(SLSOperatorMessages, f"message{i}", Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-MSG{i}", auto_monitor=True)) # setattr(SLSOperatorMessages, f"message_date{i}", Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-DATE{i}", auto_monitor=True)) - - diff --git a/ophyd_devices/smaract/smaract_controller.py b/ophyd_devices/smaract/smaract_controller.py index b2e836c..0265003 100644 --- a/ophyd_devices/smaract/smaract_controller.py +++ b/ophyd_devices/smaract/smaract_controller.py @@ -81,11 +81,7 @@ class SmaractSensors: class SmaractController(Controller): - USER_ACCESS = [ - "socket_put_and_receive", - "smaract_show_all", - "move_open_loop_steps" - ] + USER_ACCESS = ["socket_put_and_receive", "smaract_show_all", "move_open_loop_steps"] def __init__( self, @@ -265,7 +261,7 @@ class SmaractController(Controller): @axis_checked @typechecked def move_open_loop_steps( - self, axis_Id_numeric: int, steps: int, amplitude: int=2000, frequency:int=500 + self, axis_Id_numeric: int, steps: int, amplitude: int = 2000, frequency: int = 500 ) -> None: """Move open loop steps @@ -467,7 +463,6 @@ class SmaractController(Controller): t.add_row([None for t in t.field_names]) print(t) - def _check_axis_number(self, axis_Id_numeric: int) -> None: if axis_Id_numeric >= self._Smaract_axis_per_controller: raise ValueError( diff --git a/ophyd_devices/smaract/smaract_ophyd.py b/ophyd_devices/smaract/smaract_ophyd.py index 860dbd7..81af6cb 100644 --- a/ophyd_devices/smaract/smaract_ophyd.py +++ b/ophyd_devices/smaract/smaract_ophyd.py @@ -38,7 +38,7 @@ class SmaractSignalRO(SmaractSignalBase): class SmaractReadbackSignal(SmaractSignalRO): @threadlocked def _socket_get(self): - return self.controller.get_position(self.parent.axis_Id_numeric)*self.parent.sign + return self.controller.get_position(self.parent.axis_Id_numeric) * self.parent.sign class SmaractSetpointSignal(SmaractSignalBase): diff --git a/ophyd_devices/utils/controller.py b/ophyd_devices/utils/controller.py index 657ac42..da5ecfb 100644 --- a/ophyd_devices/utils/controller.py +++ b/ophyd_devices/utils/controller.py @@ -11,7 +11,7 @@ def threadlocked(fcn): @functools.wraps(fcn) def wrapper(self, *args, **kwargs): - lock = self._lock if hasattr(self, "_lock") else self.controller._lock + lock = self._lock if hasattr(self, "_lock") else self.controller._lock with lock: return fcn(self, *args, **kwargs) diff --git a/tests/test_npoint_piezo.py b/tests/test_npoint_piezo.py index dfb8fbf..782d563 100644 --- a/tests/test_npoint_piezo.py +++ b/tests/test_npoint_piezo.py @@ -121,10 +121,7 @@ def test_set_axis_out_of_range(): ], ) def test_hex_list_to_int(in_buffer, byteorder, signed, val): - assert ( - NPointController._hex_list_to_int(in_buffer, byteorder=byteorder, signed=signed) - == val - ) + assert NPointController._hex_list_to_int(in_buffer, byteorder=byteorder, signed=signed) == val @pytest.mark.parametrize(