mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-05-31 00:50:40 +02:00
test: add tests for close and stop filewriter
This commit is contained in:
parent
13d26c6537
commit
d3e8ece029
@ -7,6 +7,7 @@ from ophyd import Staged
|
|||||||
from bec_lib.core import BECMessage, MessageEndpoints
|
from bec_lib.core import BECMessage, MessageEndpoints
|
||||||
from bec_lib.core.devicemanager import DeviceContainer
|
from bec_lib.core.devicemanager import DeviceContainer
|
||||||
from bec_lib.core.tests.utils import ProducerMock
|
from bec_lib.core.tests.utils import ProducerMock
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
class MockSignal(Signal):
|
class MockSignal(Signal):
|
||||||
@ -299,6 +300,64 @@ def test_publish_file_location(mock_det, scaninfo):
|
|||||||
assert mock_det._producer.set_and_publish.call_args_list == expected_calls
|
assert mock_det._producer.set_and_publish.call_args_list == expected_calls
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"requests_state, expected_exception, url",
|
||||||
|
[
|
||||||
|
(
|
||||||
|
True,
|
||||||
|
False,
|
||||||
|
"http://x12sa-pd-2:8080/stream/pilatus_2",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
False,
|
||||||
|
False,
|
||||||
|
"http://x12sa-pd-2:8080/stream/pilatus_2",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_close_file_writer(mock_det, requests_state, expected_exception, url):
|
||||||
|
with mock.patch.object(mock_det, "_send_requests_delete") as mock_send_requests_delete:
|
||||||
|
instance = mock_send_requests_delete.return_value
|
||||||
|
instance.ok = requests_state
|
||||||
|
if expected_exception:
|
||||||
|
mock_det._close_file_writer()
|
||||||
|
mock_send_requests_delete.assert_called_once_with(url=url)
|
||||||
|
instance.raise_for_status.called_once()
|
||||||
|
else:
|
||||||
|
mock_det._close_file_writer()
|
||||||
|
mock_send_requests_delete.assert_called_once_with(url=url)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"requests_state, expected_exception, url",
|
||||||
|
[
|
||||||
|
(
|
||||||
|
True,
|
||||||
|
False,
|
||||||
|
"http://xbl-daq-34:8091/pilatus_2/stop",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
False,
|
||||||
|
True,
|
||||||
|
"http://xbl-daq-34:8091/pilatus_2/stop",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_stop_file_writer(mock_det, requests_state, expected_exception, url):
|
||||||
|
with mock.patch.object(mock_det, "_send_requests_put") as mock_send_requests_put:
|
||||||
|
instance = mock_send_requests_put.return_value
|
||||||
|
instance.ok = requests_state
|
||||||
|
instance.raise_for_status.side_effect = Exception
|
||||||
|
if expected_exception:
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
mock_det._stop_file_writer()
|
||||||
|
mock_send_requests_put.assert_called_once_with(url=url)
|
||||||
|
instance.raise_for_status.called_once()
|
||||||
|
else:
|
||||||
|
mock_det._stop_file_writer()
|
||||||
|
mock_send_requests_put.assert_called_once_with(url=url)
|
||||||
|
|
||||||
|
|
||||||
# @pytest.mark.parametrize(
|
# @pytest.mark.parametrize(
|
||||||
# "scaninfo, daq_status, expected_exception",
|
# "scaninfo, daq_status, expected_exception",
|
||||||
# [
|
# [
|
||||||
|
Loading…
x
Reference in New Issue
Block a user