mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-23 19:27:59 +02:00
enforced black
This commit is contained in:
@ -8,7 +8,13 @@ before_script:
|
||||
- pip install -e .
|
||||
# different stages in the pipeline
|
||||
stages:
|
||||
- Formatter
|
||||
- Test
|
||||
formatter:
|
||||
stage: Formatter
|
||||
script:
|
||||
- pip install black
|
||||
- black --check --diff --color --line-length=100 ./
|
||||
pytest:
|
||||
stage: Test
|
||||
script:
|
||||
|
@ -15,6 +15,7 @@ from bec_utils import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class GalilCommunicationError(Exception):
|
||||
pass
|
||||
|
||||
@ -48,7 +49,7 @@ class GalilController(Controller):
|
||||
"galil_show_all",
|
||||
"socket_put_and_receive",
|
||||
"socket_put_confirmed",
|
||||
"lgalil_is_air_off_and_orchestra_enabled"
|
||||
"lgalil_is_air_off_and_orchestra_enabled",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@ -151,9 +152,9 @@ class GalilController(Controller):
|
||||
return self.socket_put_and_receive(f"XQ#STOP,1")
|
||||
|
||||
def lgalil_is_air_off_and_orchestra_enabled(self) -> bool:
|
||||
rt_not_blocked_by_galil=bool(self.socket_put_and_receive(f"MG@OUT[9]"))
|
||||
air_off=bool(self.socket_put_and_receive(f"MG@OUT[13]"))
|
||||
return (rt_not_blocked_by_galil and air_off)
|
||||
rt_not_blocked_by_galil = bool(self.socket_put_and_receive(f"MG@OUT[9]"))
|
||||
air_off = bool(self.socket_put_and_receive(f"MG@OUT[13]"))
|
||||
return rt_not_blocked_by_galil and air_off
|
||||
|
||||
def axis_is_referenced(self, axis_Id_numeric) -> bool:
|
||||
return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip()))
|
||||
|
@ -22,6 +22,7 @@ def channel_checked(fcn):
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class NPointController(SingletonController):
|
||||
NUM_CHANNELS = 3
|
||||
_read_single_loc_bit = "A0"
|
||||
@ -84,9 +85,7 @@ class NPointController(SingletonController):
|
||||
if not self.socket.is_open:
|
||||
self.socket.open()
|
||||
try:
|
||||
self.socket.connect(
|
||||
self._server_and_port_name[0], self._server_and_port_name[1]
|
||||
)
|
||||
self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1])
|
||||
except socket.timeout:
|
||||
raise TimeoutError(
|
||||
f"Failed to connect to the specified server and port {self._server_and_port_name}."
|
||||
@ -96,9 +95,7 @@ class NPointController(SingletonController):
|
||||
self.socket.close()
|
||||
time.sleep(0.5)
|
||||
self.socket.open()
|
||||
self.socket.connect(
|
||||
self._server_and_port_name[0], self._server_and_port_name[1]
|
||||
)
|
||||
self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1])
|
||||
self.connected = True
|
||||
|
||||
@threadlocked
|
||||
|
@ -303,8 +303,8 @@ class RtLamniController(Controller):
|
||||
time.sleep(0.1)
|
||||
self.start_readout()
|
||||
|
||||
def _get_signals_from_table(self, return_table) ->dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[5])
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[5])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[8])
|
||||
self.average_lamni_angle += float(return_table[19])
|
||||
signals = {
|
||||
@ -326,9 +326,13 @@ class RtLamniController(Controller):
|
||||
"stdev_cap5": {"value": float(return_table[18])},
|
||||
"average_angle_interf_ST": {"value": float(return_table[19])},
|
||||
"stdev_angle_interf_ST": {"value": float(return_table[20])},
|
||||
"average_stdeviations_x_st_fzp": {"value": self.average_stdeviations_x_st_fzp/(int(return_table[0])+1)},
|
||||
"average_stdeviations_y_st_fzp": {"value": self.average_stdeviations_y_st_fzp/(int(return_table[0])+1)},
|
||||
"average_lamni_angle": {"value": self.average_lamni_angle/(int(return_table[0])+1)},
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_stdeviations_y_st_fzp": {
|
||||
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
|
||||
}
|
||||
return signals
|
||||
|
||||
@ -347,7 +351,12 @@ class RtLamniController(Controller):
|
||||
|
||||
# if not (mode==2 or mode==3):
|
||||
# error
|
||||
self.get_device_manager().producer.set_and_publish(MessageEndpoints.device_status("rt_scan"), BECMessage.DeviceStatusMessage(device="rt_scan", status=1, metadata=self.readout_metadata).dumps())
|
||||
self.get_device_manager().producer.set_and_publish(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
BECMessage.DeviceStatusMessage(
|
||||
device="rt_scan", status=1, metadata=self.readout_metadata
|
||||
).dumps(),
|
||||
)
|
||||
# while scan is running
|
||||
while mode > 0:
|
||||
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
|
||||
@ -361,7 +370,6 @@ class RtLamniController(Controller):
|
||||
|
||||
read_counter = read_counter + 1
|
||||
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
|
||||
self.publish_device_data(signals=signals, pointID=int(return_table[0]))
|
||||
@ -378,8 +386,12 @@ class RtLamniController(Controller):
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.publish_device_data(signals=signals, pointID=int(return_table[0]))
|
||||
|
||||
self.get_device_manager().producer.set_and_publish(MessageEndpoints.device_status("rt_scan"), BECMessage.DeviceStatusMessage(device="rt_scan", status=0, metadata=self.readout_metadata).dumps())
|
||||
|
||||
self.get_device_manager().producer.set_and_publish(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
BECMessage.DeviceStatusMessage(
|
||||
device="rt_scan", status=0, metadata=self.readout_metadata
|
||||
).dumps(),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}."
|
||||
|
@ -14,11 +14,10 @@ class SLSOperatorMessages(Device):
|
||||
message5 = Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-MSG5", auto_monitor=True)
|
||||
message_date5 = Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-DATE5", auto_monitor=True)
|
||||
|
||||
|
||||
# class SLSOperatorMessages(Device):
|
||||
# pass
|
||||
|
||||
# for i in range(5):
|
||||
# setattr(SLSOperatorMessages, f"message{i}", Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-MSG{i}", auto_monitor=True))
|
||||
# setattr(SLSOperatorMessages, f"message_date{i}", Cpt(EpicsSignalRO, f"ACOAU-ACCU:OP-DATE{i}", auto_monitor=True))
|
||||
|
||||
|
||||
|
@ -81,11 +81,7 @@ class SmaractSensors:
|
||||
|
||||
|
||||
class SmaractController(Controller):
|
||||
USER_ACCESS = [
|
||||
"socket_put_and_receive",
|
||||
"smaract_show_all",
|
||||
"move_open_loop_steps"
|
||||
]
|
||||
USER_ACCESS = ["socket_put_and_receive", "smaract_show_all", "move_open_loop_steps"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -265,7 +261,7 @@ class SmaractController(Controller):
|
||||
@axis_checked
|
||||
@typechecked
|
||||
def move_open_loop_steps(
|
||||
self, axis_Id_numeric: int, steps: int, amplitude: int=2000, frequency:int=500
|
||||
self, axis_Id_numeric: int, steps: int, amplitude: int = 2000, frequency: int = 500
|
||||
) -> None:
|
||||
"""Move open loop steps
|
||||
|
||||
@ -467,7 +463,6 @@ class SmaractController(Controller):
|
||||
t.add_row([None for t in t.field_names])
|
||||
print(t)
|
||||
|
||||
|
||||
def _check_axis_number(self, axis_Id_numeric: int) -> None:
|
||||
if axis_Id_numeric >= self._Smaract_axis_per_controller:
|
||||
raise ValueError(
|
||||
|
@ -38,7 +38,7 @@ class SmaractSignalRO(SmaractSignalBase):
|
||||
class SmaractReadbackSignal(SmaractSignalRO):
|
||||
@threadlocked
|
||||
def _socket_get(self):
|
||||
return self.controller.get_position(self.parent.axis_Id_numeric)*self.parent.sign
|
||||
return self.controller.get_position(self.parent.axis_Id_numeric) * self.parent.sign
|
||||
|
||||
|
||||
class SmaractSetpointSignal(SmaractSignalBase):
|
||||
|
@ -121,10 +121,7 @@ def test_set_axis_out_of_range():
|
||||
],
|
||||
)
|
||||
def test_hex_list_to_int(in_buffer, byteorder, signed, val):
|
||||
assert (
|
||||
NPointController._hex_list_to_int(in_buffer, byteorder=byteorder, signed=signed)
|
||||
== val
|
||||
)
|
||||
assert NPointController._hex_list_to_int(in_buffer, byteorder=byteorder, signed=signed) == val
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
Reference in New Issue
Block a user