refactor(controller): refactor set_device_enable method from controller to set_device_read_write
This commit is contained in:
@@ -169,10 +169,10 @@ class RtFlomniController(Controller):
|
||||
rtx = self.get_device_manager().devices.rtx
|
||||
rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage})
|
||||
|
||||
self.set_device_enabled("fsamx", False)
|
||||
self.set_device_enabled("fsamy", False)
|
||||
self.set_device_enabled("foptx", False)
|
||||
self.set_device_enabled("fopty", False)
|
||||
self.set_device_read_write("fsamx", False)
|
||||
self.set_device_read_write("fsamy", False)
|
||||
self.set_device_read_write("foptx", False)
|
||||
self.set_device_read_write("fopty", False)
|
||||
|
||||
def move_samx_to_scan_region(self, fovx: float, cenx: float):
|
||||
time.sleep(0.05)
|
||||
@@ -225,20 +225,20 @@ class RtFlomniController(Controller):
|
||||
print("Feedback is not running; likely an error in the interferometer.")
|
||||
raise RtError("Feedback is not running; likely an error in the interferometer.")
|
||||
|
||||
self.set_device_enabled("fsamx", False)
|
||||
self.set_device_enabled("fsamy", False)
|
||||
self.set_device_enabled("foptx", False)
|
||||
self.set_device_enabled("fopty", False)
|
||||
self.set_device_read_write("fsamx", False)
|
||||
self.set_device_read_write("fsamy", False)
|
||||
self.set_device_read_write("foptx", False)
|
||||
self.set_device_read_write("fopty", False)
|
||||
|
||||
def feedback_disable(self):
|
||||
self.clear_trajectory_generator()
|
||||
self.move_to_zero()
|
||||
self.socket_put("l0")
|
||||
|
||||
self.set_device_enabled("fsamx", True)
|
||||
self.set_device_enabled("fsamy", True)
|
||||
self.set_device_enabled("foptx", True)
|
||||
self.set_device_enabled("fopty", True)
|
||||
self.set_device_read_write("fsamx", True)
|
||||
self.set_device_read_write("fsamy", True)
|
||||
self.set_device_read_write("foptx", True)
|
||||
self.set_device_read_write("fopty", True)
|
||||
|
||||
fsamx = self.get_device_manager().devices.fsamx
|
||||
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]")
|
||||
|
||||
@@ -94,11 +94,11 @@ class RtLamniController(Controller):
|
||||
def feedback_disable(self):
|
||||
self.socket_put("J0")
|
||||
logger.info("LamNI Feedback disabled.")
|
||||
self.set_device_enabled("lsamx", True)
|
||||
self.set_device_enabled("lsamy", True)
|
||||
self.set_device_enabled("loptx", True)
|
||||
self.set_device_enabled("lopty", True)
|
||||
self.set_device_enabled("loptz", True)
|
||||
self.set_device_read_write("lsamx", True)
|
||||
self.set_device_read_write("lsamy", True)
|
||||
self.set_device_read_write("loptx", True)
|
||||
self.set_device_read_write("lopty", True)
|
||||
self.set_device_read_write("loptz", True)
|
||||
|
||||
def is_axis_moving(self, axis_Id) -> bool:
|
||||
# this checks that axis is on target
|
||||
@@ -152,25 +152,25 @@ class RtLamniController(Controller):
|
||||
# set these as closed loop target position
|
||||
self.socket_put(f"pa0,{x_curr:.4f}")
|
||||
self.socket_put(f"pa1,{y_curr:.4f}")
|
||||
self.get_device_manager().devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
|
||||
self.get_device_manager().devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
|
||||
self.dm.devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
|
||||
self.dm.devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
|
||||
self.socket_put("J5")
|
||||
logger.info("LamNI Feedback enabled (without reset).")
|
||||
self.set_device_enabled("lsamx", False)
|
||||
self.set_device_enabled("lsamy", False)
|
||||
self.set_device_enabled("loptx", False)
|
||||
self.set_device_enabled("lopty", False)
|
||||
self.set_device_enabled("loptz", False)
|
||||
self.set_device_read_write("lsamx", False)
|
||||
self.set_device_read_write("lsamy", False)
|
||||
self.set_device_read_write("loptx", False)
|
||||
self.set_device_read_write("lopty", False)
|
||||
self.set_device_read_write("loptz", False)
|
||||
|
||||
@threadlocked
|
||||
def feedback_disable_and_even_reset_lamni_angle_interferometer(self):
|
||||
self.socket_put("J6")
|
||||
logger.info("LamNI Feedback disabled including the angular interferometer.")
|
||||
self.set_device_enabled("lsamx", True)
|
||||
self.set_device_enabled("lsamy", True)
|
||||
self.set_device_enabled("loptx", True)
|
||||
self.set_device_enabled("lopty", True)
|
||||
self.set_device_enabled("loptz", True)
|
||||
self.set_device_read_write("lsamx", True)
|
||||
self.set_device_read_write("lsamy", True)
|
||||
self.set_device_read_write("loptx", True)
|
||||
self.set_device_read_write("lopty", True)
|
||||
self.set_device_read_write("loptz", True)
|
||||
|
||||
@threadlocked
|
||||
def clear_trajectory_generator(self):
|
||||
@@ -407,11 +407,11 @@ class RtLamniController(Controller):
|
||||
(self.socket_put_and_receive("J2")).split(",")[0]
|
||||
)
|
||||
|
||||
self.set_device_enabled("lsamx", False)
|
||||
self.set_device_enabled("lsamy", False)
|
||||
self.set_device_enabled("loptx", False)
|
||||
self.set_device_enabled("lopty", False)
|
||||
self.set_device_enabled("loptz", False)
|
||||
self.set_device_read_write("lsamx", False)
|
||||
self.set_device_read_write("lsamy", False)
|
||||
self.set_device_read_write("loptx", False)
|
||||
self.set_device_read_write("lopty", False)
|
||||
self.set_device_read_write("loptz", False)
|
||||
|
||||
if interferometer_feedback_not_running == 1:
|
||||
logger.error(
|
||||
|
||||
@@ -556,12 +556,12 @@ class RtOMNYController(Controller):
|
||||
|
||||
time.sleep(1.5)
|
||||
|
||||
self.set_device_enabled("osamx", False)
|
||||
self.set_device_enabled("osamy", False)
|
||||
self.set_device_enabled("ofzpx", False)
|
||||
self.set_device_enabled("ofzpy", False)
|
||||
self.set_device_enabled("oosax", False)
|
||||
self.set_device_enabled("oosax", False)
|
||||
self.set_device_read_write("osamx", False)
|
||||
self.set_device_read_write("osamy", False)
|
||||
self.set_device_read_write("ofzpx", False)
|
||||
self.set_device_read_write("ofzpy", False)
|
||||
self.set_device_read_write("oosax", False)
|
||||
self.set_device_read_write("oosax", False)
|
||||
|
||||
print("Feedback is running.")
|
||||
|
||||
@@ -575,12 +575,12 @@ class RtOMNYController(Controller):
|
||||
self.move_to_zero()
|
||||
self.socket_put("J0")
|
||||
|
||||
self.set_device_enabled("osamx", True)
|
||||
self.set_device_enabled("osamy", True)
|
||||
self.set_device_enabled("ofzpx", True)
|
||||
self.set_device_enabled("ofzpy", True)
|
||||
self.set_device_enabled("oosax", True)
|
||||
self.set_device_enabled("oosax", True)
|
||||
self.set_device_read_write("osamx", True)
|
||||
self.set_device_read_write("osamy", True)
|
||||
self.set_device_read_write("ofzpx", True)
|
||||
self.set_device_read_write("ofzpy", True)
|
||||
self.set_device_read_write("oosax", True)
|
||||
self.set_device_read_write("oosax", True)
|
||||
|
||||
print("rt feedback is now disabled.")
|
||||
|
||||
|
||||
@@ -7,10 +7,15 @@ from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGal
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fsamroy():
|
||||
def fsamroy(dm_with_devices):
|
||||
FuprGalilController._reset_controller()
|
||||
fsamroy_motor = FuprGalilMotor(
|
||||
"A", name="fsamroy", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
"A",
|
||||
name="fsamroy",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
fsamroy_motor.controller.on()
|
||||
assert isinstance(fsamroy_motor.controller, FuprGalilController)
|
||||
|
||||
@@ -8,10 +8,15 @@ from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, Lamn
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyey():
|
||||
def leyey(dm_with_devices):
|
||||
LamniGalilController._reset_controller()
|
||||
leyey_motor = LamniGalilMotor(
|
||||
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
"H",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
leyey_motor.controller.on()
|
||||
yield leyey_motor
|
||||
@@ -20,10 +25,15 @@ def leyey():
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyex():
|
||||
def leyex(dm_with_devices):
|
||||
LamniGalilController._reset_controller()
|
||||
leyex_motor = LamniGalilMotor(
|
||||
"A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
"A",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
leyex_motor.controller.on()
|
||||
yield leyex_motor
|
||||
|
||||
@@ -7,10 +7,15 @@ from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, Flo
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyey():
|
||||
def leyey(dm_with_devices):
|
||||
FlomniGalilController._reset_controller()
|
||||
leyey_motor = FlomniGalilMotor(
|
||||
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
"H",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
leyey_motor.controller.on()
|
||||
yield leyey_motor
|
||||
@@ -19,10 +24,15 @@ def leyey():
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyex():
|
||||
def leyex(dm_with_devices):
|
||||
FlomniGalilController._reset_controller()
|
||||
leyex_motor = FlomniGalilMotor(
|
||||
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
"H",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
leyex_motor.controller.on()
|
||||
yield leyex_motor
|
||||
@@ -157,11 +167,16 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages
|
||||
],
|
||||
)
|
||||
def test_fosaz_light_curtain_is_triggered(
|
||||
axis_Id, socket_put_messages, socket_get_messages, triggered
|
||||
axis_Id, socket_put_messages, socket_get_messages, triggered, dm_with_devices
|
||||
):
|
||||
"""test that the light curtain is triggered"""
|
||||
fosaz = FlomniGalilMotor(
|
||||
axis_Id, name="fosaz", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
axis_Id,
|
||||
name="fosaz",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
fosaz.controller.on()
|
||||
fosaz.controller.sock.flush_buffer()
|
||||
|
||||
@@ -28,13 +28,18 @@ def controller():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def npointx():
|
||||
def npointx(dm_with_devices):
|
||||
"""
|
||||
Fixture to create a NPointAxis object.
|
||||
"""
|
||||
controller = mock.MagicMock()
|
||||
npointx = NPointAxis(
|
||||
axis_Id="A", name="npointx", host="localhost", port=1234, socket_cls=controller
|
||||
axis_Id="A",
|
||||
name="npointx",
|
||||
host="localhost",
|
||||
port=1234,
|
||||
socket_cls=controller,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
npointx.controller.on()
|
||||
npointx.controller.sock.reset_mock()
|
||||
@@ -110,13 +115,18 @@ def test_axis_get_in(npointx, axis, msg_in, msg_out):
|
||||
npointx.controller.sock.put.assert_called_once_with(msg_in)
|
||||
|
||||
|
||||
def test_axis_out_of_range(controller):
|
||||
def test_axis_out_of_range(dm_with_devices):
|
||||
"""
|
||||
Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID.
|
||||
"""
|
||||
with pytest.raises(ValueError):
|
||||
npointx = NPointAxis(
|
||||
axis_Id="G", name="npointx", host="localhost", port=1234, socket_cls=mock.MagicMock()
|
||||
axis_Id="G",
|
||||
name="npointx",
|
||||
host="localhost",
|
||||
port=1234,
|
||||
socket_cls=mock.MagicMock(),
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -80,16 +80,16 @@ def test_move_samx_to_scan_region(rt_flomni):
|
||||
|
||||
|
||||
def test_feedback_enable_without_reset(rt_flomni):
|
||||
with mock.patch.object(rt_flomni, "set_device_enabled") as set_device_enabled:
|
||||
with mock.patch.object(rt_flomni, "set_device_read_write") as set_device_read_write:
|
||||
with mock.patch.object(rt_flomni, "feedback_is_running", return_value=True):
|
||||
with mock.patch.object(rt_flomni, "laser_tracker_on") as laser_tracker_on:
|
||||
rt_flomni.feedback_enable_without_reset()
|
||||
laser_tracker_on.assert_called_once()
|
||||
assert mock.call(b"l3\n") in rt_flomni.sock.put.mock_calls
|
||||
assert mock.call("fsamx", False) in set_device_enabled.mock_calls
|
||||
assert mock.call("fsamy", False) in set_device_enabled.mock_calls
|
||||
assert mock.call("foptx", False) in set_device_enabled.mock_calls
|
||||
assert mock.call("fopty", False) in set_device_enabled.mock_calls
|
||||
assert mock.call("fsamx", False) in set_device_read_write.mock_calls
|
||||
assert mock.call("fsamy", False) in set_device_read_write.mock_calls
|
||||
assert mock.call("foptx", False) in set_device_read_write.mock_calls
|
||||
assert mock.call("fopty", False) in set_device_read_write.mock_calls
|
||||
|
||||
|
||||
def test_feedback_enable_without_reset_raises(rt_flomni):
|
||||
|
||||
@@ -10,10 +10,10 @@ from csaxs_bec.devices.smaract.smaract_ophyd import SmaractMotor
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def controller():
|
||||
def controller(dm_with_devices):
|
||||
SmaractController._reset_controller()
|
||||
controller = SmaractController(
|
||||
socket_cls=SocketMock, socket_host="dummy", socket_port=123, device_manager=mock.MagicMock()
|
||||
socket_cls=SocketMock, socket_host="dummy", socket_port=123, device_manager=dm_with_devices
|
||||
)
|
||||
controller.on()
|
||||
controller.sock.flush_buffer()
|
||||
@@ -21,10 +21,16 @@ def controller():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lsmarA():
|
||||
def lsmarA(dm_with_devices):
|
||||
SmaractController._reset_controller()
|
||||
motor_a = SmaractMotor(
|
||||
"A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
|
||||
"A",
|
||||
name="lsmarA",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8085,
|
||||
sign=1,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
motor_a.controller.on()
|
||||
motor_a.controller.sock.flush_buffer()
|
||||
|
||||
Reference in New Issue
Block a user