diff --git a/ophyd_devices/smaract/smaract_controller.py b/ophyd_devices/smaract/smaract_controller.py index 30f9314..3a65e79 100644 --- a/ophyd_devices/smaract/smaract_controller.py +++ b/ophyd_devices/smaract/smaract_controller.py @@ -9,10 +9,7 @@ import numpy as np from prettytable import PrettyTable from typeguard import typechecked -from ophyd_devices.smaract.smaract_errors import ( - SmaractCommunicationError, - SmaractErrorCode, -) +from ophyd_devices.smaract.smaract_errors import SmaractCommunicationError, SmaractErrorCode from ophyd_devices.utils.controller import Controller, axis_checked, threadlocked logger = logging.getLogger("smaract_controller") @@ -118,11 +115,7 @@ class SmaractController(Controller): @threadlocked def socket_put_and_receive( - self, - val: str, - remove_trailing_chars=True, - check_for_errors=True, - raise_if_not_status=False, + self, val: str, remove_trailing_chars=True, check_for_errors=True, raise_if_not_status=False ) -> str: self.socket_put(val) return_val = "" @@ -195,7 +188,7 @@ class SmaractController(Controller): return bool(int(return_val.split(",")[1])) def all_axes_referenced(self) -> bool: - return all([self.is_axis_referenced(ax) for ax in self._axis if ax is not None]) + return all(self.axis_is_referenced(ax) for ax in self._axis if ax is not None) @retry_once @axis_checked @@ -265,8 +258,7 @@ class SmaractController(Controller): frequency (int): Frequency in Hz that the steps are performed with. The valid range is 1..18,500. Default: 2000. """ self.socket_put_and_receive( - f"MST{axis_Id_numeric},{steps},{amplitude},{frequency}", - raise_if_not_status=True, + f"MST{axis_Id_numeric},{steps},{amplitude},{frequency}", raise_if_not_status=True ) @retry_once @@ -440,14 +432,7 @@ class SmaractController(Controller): def describe(self) -> None: t = PrettyTable() t.title = f"{self.__class__.__name__} on {self.sock.host}:{self.sock.port}" - t.field_names = [ - "Axis", - "Name", - "Connected", - "Referenced", - "Closed Loop Speed", - "Position", - ] + t.field_names = ["Axis", "Name", "Connected", "Referenced", "Closed Loop Speed", "Position"] for ax in range(self._axes_per_controller): axis = self._axis[ax] if axis is not None: @@ -512,7 +497,7 @@ class SmaractController(Controller): def _message_starts_with(self, msg: str, leading_chars: str) -> bool: if msg.startswith(leading_chars): return True - else: - raise SmaractCommunicationError( - f"Expected to receive a return message starting with {leading_chars} but instead received '{msg}'" - ) + raise SmaractCommunicationError( + f"Expected to receive a return message starting with {leading_chars} but instead" + f" received '{msg}'" + ) diff --git a/tests/test_smaract.py b/tests/test_smaract.py index 6c2d733..776b5e4 100644 --- a/tests/test_smaract.py +++ b/tests/test_smaract.py @@ -3,20 +3,17 @@ from utils import SocketMock from ophyd_devices.smaract import SmaractController from ophyd_devices.smaract.smaract_controller import SmaractCommunicationMode -from ophyd_devices.smaract.smaract_errors import ( - SmaractCommunicationError, - SmaractErrorCode, -) +from ophyd_devices.smaract.smaract_errors import SmaractCommunicationError, SmaractErrorCode from ophyd_devices.smaract.smaract_ophyd import SmaractMotor @pytest.mark.parametrize( "axis,position,get_message,return_msg", [ - (0, 50, b":GP0\n", b":P0,50000000"), - (1, 0, b":GP1\n", b":P1,0"), - (0, -50, b":GP0\n", b":P0,-50000000"), - (0, -50.23, b":GP0\n", b":P0,-50230000"), + (0, 50, b":GP0\n", b":P0,50000000\n"), + (1, 0, b":GP1\n", b":P1,0\n"), + (0, -50, b":GP0\n", b":P0,-50000000\n"), + (0, -50.23, b":GP0\n", b":P0,-50230000\n"), ], ) def test_get_position(axis, position, get_message, return_msg): @@ -33,10 +30,10 @@ def test_get_position(axis, position, get_message, return_msg): @pytest.mark.parametrize( "axis,is_referenced,get_message,return_msg,exception", [ - (0, True, b":GPPK0\n", b":PPK0,1", None), - (1, True, b":GPPK1\n", b":PPK1,1", None), - (0, False, b":GPPK0\n", b":PPK0,0", None), - (200, False, b":GPPK0\n", b":PPK0,0", ValueError), + (0, True, b":GPPK0\n", b":PPK0,1\n", None), + (1, True, b":GPPK1\n", b":PPK1,1\n", None), + (0, False, b":GPPK0\n", b":PPK0,0\n", None), + (200, False, b":GPPK0\n", b":PPK0,0\n", ValueError), ], ) def test_axis_is_referenced(axis, is_referenced, get_message, return_msg, exception): @@ -57,7 +54,7 @@ def test_axis_is_referenced(axis, is_referenced, get_message, return_msg, except @pytest.mark.parametrize( "return_msg,exception,raised", [ - (b"false", SmaractCommunicationError, False), + (b"false\n", SmaractCommunicationError, False), (b":E0,1", SmaractErrorCode, True), (b":E,1", SmaractCommunicationError, True), (b":E,-1", SmaractCommunicationError, True), @@ -79,15 +76,11 @@ def test_socket_put_and_receive_raises_exception(return_msg, exception, raised): with pytest.raises(exception): controller.socket_put_and_receive(b"test") else: - assert controller.socket_put_and_receive(b"test") == return_msg.decode() + assert controller.socket_put_and_receive(b"test") == return_msg.split(b"\n")[0].decode() @pytest.mark.parametrize( - "mode,get_message,return_msg", - [ - (0, b":GCM\n", b":CM0"), - (1, b":GCM\n", b":CM1"), - ], + "mode,get_message,return_msg", [(0, b":GCM\n", b":CM0\n"), (1, b":GCM\n", b":CM1\n")] ) def test_communication_mode(mode, get_message, return_msg): SmaractController._reset_controller() @@ -103,16 +96,16 @@ def test_communication_mode(mode, get_message, return_msg): @pytest.mark.parametrize( "is_moving,get_message,return_msg", [ - (0, b":GS0\n", b":S0,0"), - (1, b":GS0\n", b":S0,1"), - (1, b":GS0\n", b":S0,2"), - (0, b":GS0\n", b":S0,3"), - (1, b":GS0\n", b":S0,4"), - (0, b":GS0\n", b":S0,5"), - (0, b":GS0\n", b":S0,6"), - (1, b":GS0\n", b":S0,7"), - (0, b":GS0\n", b":S0,9"), - (0, [b":GS0\n", b":GS0\n"], [b":E0,0", b":S0,9"]), + (0, b":GS0\n", b":S0,0\n"), + (1, b":GS0\n", b":S0,1\n"), + (1, b":GS0\n", b":S0,2\n"), + (0, b":GS0\n", b":S0,3\n"), + (1, b":GS0\n", b":S0,4\n"), + (0, b":GS0\n", b":S0,5\n"), + (0, b":GS0\n", b":S0,6\n"), + (1, b":GS0\n", b":S0,7\n"), + (0, b":GS0\n", b":S0,9\n"), + (0, [b":GS0\n", b":GS0\n"], [b":E0,0\n", b":S0,9"]), ], ) def test_axis_is_moving(is_moving, get_message, return_msg): @@ -131,9 +124,9 @@ def test_axis_is_moving(is_moving, get_message, return_msg): @pytest.mark.parametrize( "sensor_id,axis,get_msg,return_msg", [ - (1, 0, b":GST0\n", b":ST0,1"), - (6, 0, b":GST0\n", b":ST0,6"), - (6, 1, b":GST1\n", b":ST1,6"), + (1, 0, b":GST0\n", b":ST0,1\n"), + (6, 0, b":GST0\n", b":ST0,6\n"), + (6, 1, b":GST1\n", b":ST1,6\n"), ], ) def test_get_sensor_definition(sensor_id, axis, get_msg, return_msg): @@ -191,34 +184,29 @@ def test_move_axis_to_absolute_position(pos, axis, hold_time, get_msg, return_ms ( 50, [b":GPPK0\n", b":MPA0,50000000,1000\n", b":GS0\n", b":GP0\n"], - [b":PPK0,1", b":E0,0", b":S0,0", b":P0,50000000\n"], + [b":PPK0,1\n", b":E0,0\n", b":S0,0\n", b":P0,50000000\n"], ), ( 0, [b":GPPK0\n", b":MPA0,0,1000\n", b":GS0\n", b":GP0\n"], - [b":PPK0,1", b":E0,0", b":S0,0", b":P0,0000000\n"], + [b":PPK0,1\n", b":E0,0\n", b":S0,0\n", b":P0,0000000\n"], ), ( 20.23, [b":GPPK0\n", b":MPA0,20230000,1000\n", b":GS0\n", b":GP0\n"], - [b":PPK0,1", b":E0,0", b":S0,0", b":P0,20230000\n"], + [b":PPK0,1\n", b":E0,0\n", b":S0,0\n", b":P0,20230000\n"], ), ( 20.23, [b":GPPK0\n", b":GPPK0\n", b":MPA0,20230000,1000\n", b":GS0\n", b":GP0\n"], - [b":S0,0", b":PPK0,1", b":E0,0", b":S0,0", b":P0,20230000\n"], + [b":S0,0\n", b":PPK0,1\n", b":E0,0\n", b":S0,0\n", b":P0,20230000\n"], ), ], ) def test_move_axis(pos, get_msg, return_msg): SmaractController._reset_controller() lsmarA = SmaractMotor( - "A", - name="lsmarA", - host="mpc2680.psi.ch", - port=8085, - sign=1, - socket_cls=SocketMock, + "A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock ) lsmarA.controller.on() controller = lsmarA.controller @@ -228,25 +216,11 @@ def test_move_axis(pos, get_msg, return_msg): assert controller.sock.buffer_put == get_msg -@pytest.mark.parametrize( - "num_axes,get_msg,return_msg", - [ - ( - 1, - [b":S0\n"], - [b":E0,0"], - ) - ], -) +@pytest.mark.parametrize("num_axes,get_msg,return_msg", [(1, [b":S0\n"], [b":E0,0"])]) def test_stop_axis(num_axes, get_msg, return_msg): SmaractController._reset_controller() lsmarA = SmaractMotor( - "A", - name="lsmarA", - host="mpc2680.psi.ch", - port=8085, - sign=1, - socket_cls=SocketMock, + "A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock ) lsmarA.stage() controller = lsmarA.controller