mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-12 14:27:14 +02:00
test: adapt tests to consider returned DeviceStatus for on_trigger/complete
This commit is contained in:
@ -72,13 +72,12 @@ def test_unstage(detector_base):
|
|||||||
rtr = detector_base.unstage()
|
rtr = detector_base.unstage()
|
||||||
assert isinstance(rtr, list)
|
assert isinstance(rtr, list)
|
||||||
assert mock_check_scan_id.call_count == 1
|
assert mock_check_scan_id.call_count == 1
|
||||||
mock_on_unstage.assert_not_called()
|
assert mock_on_unstage.call_count == 1
|
||||||
detector_base.stopped = False
|
detector_base.stopped = False
|
||||||
rtr = detector_base.unstage()
|
rtr = detector_base.unstage()
|
||||||
assert isinstance(rtr, list)
|
assert isinstance(rtr, list)
|
||||||
assert mock_check_scan_id.call_count == 2
|
assert mock_check_scan_id.call_count == 2
|
||||||
assert detector_base.stopped is False
|
assert mock_on_unstage.call_count == 2
|
||||||
mock_on_unstage.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_complete(detector_base):
|
def test_complete(detector_base):
|
||||||
|
@ -13,6 +13,7 @@ from bec_lib import messages
|
|||||||
from bec_lib.endpoints import MessageEndpoints
|
from bec_lib.endpoints import MessageEndpoints
|
||||||
from bec_server.device_server.tests.utils import DMMock
|
from bec_server.device_server.tests.utils import DMMock
|
||||||
from ophyd import Device, Signal
|
from ophyd import Device, Signal
|
||||||
|
from ophyd.status import wait as status_wait
|
||||||
|
|
||||||
from ophyd_devices.interfaces.protocols.bec_protocols import (
|
from ophyd_devices.interfaces.protocols.bec_protocols import (
|
||||||
BECDeviceProtocol,
|
BECDeviceProtocol,
|
||||||
@ -456,10 +457,14 @@ def test_cam_stage_h5writer(camera):
|
|||||||
def test_cam_complete(camera):
|
def test_cam_complete(camera):
|
||||||
"""Test the complete method of SimCamera."""
|
"""Test the complete method of SimCamera."""
|
||||||
with mock.patch.object(camera, "h5_writer") as mock_h5_writer:
|
with mock.patch.object(camera, "h5_writer") as mock_h5_writer:
|
||||||
camera.complete()
|
status = camera.complete()
|
||||||
|
status_wait(status)
|
||||||
|
assert status.done is True
|
||||||
|
assert status.success is True
|
||||||
assert mock_h5_writer.write_data.call_count == 0
|
assert mock_h5_writer.write_data.call_count == 0
|
||||||
camera.write_to_disk.put(True)
|
camera.write_to_disk.put(True)
|
||||||
camera.complete()
|
status = camera.complete()
|
||||||
|
status_wait(status)
|
||||||
assert mock_h5_writer.write_data.call_count == 1
|
assert mock_h5_writer.write_data.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
@ -467,12 +472,17 @@ def test_cam_trigger(camera):
|
|||||||
"""Test the trigger method of SimCamera."""
|
"""Test the trigger method of SimCamera."""
|
||||||
with mock.patch.object(camera, "h5_writer") as mock_h5_writer:
|
with mock.patch.object(camera, "h5_writer") as mock_h5_writer:
|
||||||
data = []
|
data = []
|
||||||
camera.trigger()
|
status = camera.trigger()
|
||||||
|
status_wait(status)
|
||||||
|
assert status.done is True
|
||||||
|
assert status.success is True
|
||||||
assert mock_h5_writer.receive_data.call_count == 0
|
assert mock_h5_writer.receive_data.call_count == 0
|
||||||
camera.write_to_disk.put(True)
|
camera.write_to_disk.put(True)
|
||||||
camera.trigger()
|
status = camera.trigger()
|
||||||
|
status_wait(status)
|
||||||
assert mock_h5_writer.receive_data.call_count == 1
|
assert mock_h5_writer.receive_data.call_count == 1
|
||||||
camera.trigger()
|
status = camera.trigger()
|
||||||
|
status_wait(status)
|
||||||
assert mock_h5_writer.receive_data.call_count == 2
|
assert mock_h5_writer.receive_data.call_count == 2
|
||||||
|
|
||||||
|
|
||||||
@ -516,10 +526,16 @@ def test_async_monitor_complete(async_monitor):
|
|||||||
mock.patch.object(async_monitor.custom_prepare, "_send_data_to_bec") as mock_send,
|
mock.patch.object(async_monitor.custom_prepare, "_send_data_to_bec") as mock_send,
|
||||||
mock.patch.object(async_monitor.custom_prepare, "prep_random_interval") as mock_prep,
|
mock.patch.object(async_monitor.custom_prepare, "prep_random_interval") as mock_prep,
|
||||||
):
|
):
|
||||||
async_monitor.complete()
|
status = async_monitor.complete()
|
||||||
|
status_wait(status)
|
||||||
|
assert status.done is True
|
||||||
|
assert status.success is True
|
||||||
assert mock_send.call_count == 0
|
assert mock_send.call_count == 0
|
||||||
async_monitor.data_buffer["value"].append(0)
|
async_monitor.data_buffer["value"].append(0)
|
||||||
async_monitor.complete()
|
status = async_monitor.complete()
|
||||||
|
status_wait(status)
|
||||||
|
assert status.done is True
|
||||||
|
assert status.success is True
|
||||||
assert mock_send.call_count == 1
|
assert mock_send.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
@ -529,7 +545,8 @@ def test_async_mon_on_trigger(async_monitor):
|
|||||||
async_monitor.custom_prepare.on_stage()
|
async_monitor.custom_prepare.on_stage()
|
||||||
upper_limit = async_monitor.custom_prepare._random_send_interval
|
upper_limit = async_monitor.custom_prepare._random_send_interval
|
||||||
for ii in range(1, upper_limit + 1):
|
for ii in range(1, upper_limit + 1):
|
||||||
async_monitor.custom_prepare.on_trigger()
|
status = async_monitor.custom_prepare.on_trigger()
|
||||||
|
status_wait(status)
|
||||||
assert async_monitor.current_trigger.get() == ii
|
assert async_monitor.current_trigger.get() == ii
|
||||||
assert mock_send.call_count == 1
|
assert mock_send.call_count == 1
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user