test:test_set_devices

This commit is contained in:
stalbe_j 2023-01-13 14:36:08 +01:00
parent b0bec67db7
commit d13b031a5e

View File

@ -283,6 +283,28 @@ def test_wait_for_device_server():
worker.parent.wait_for_service.assert_called_once_with("DeviceServer") worker.parent.wait_for_service.assert_called_once_with("DeviceServer")
@pytest.mark.parametrize(
"instr",
[
(
BECMessage.DeviceInstructionMessage(
device=["samx"],
action="set",
parameter={"value": 10, "wait_group": "scan_motor", "time": 30},
metadata={"stream": "primary", "DIID": 3, "scanID": "scanID", "RID": "requestID"},
)
),
],
)
def test_set_devices(instr):
worker = get_scan_worker()
worker.device_manager.producer.send = mock.MagicMock()
worker._set_devices(instr)
worker.device_manager.producer.send.assert_called_once_with(
MessageEndpoints.device_instructions(), instr.dumps()
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_status,devices,instr,abort", "device_status,devices,instr,abort",
[ [