WIP
This commit is contained in:
@@ -1,353 +1,353 @@
|
||||
import json
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
import requests_mock
|
||||
import typeguard
|
||||
from ophyd import StatusBase
|
||||
from websockets import WebSocketException
|
||||
|
||||
from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqError, StdDaqStatus
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
parent_device = mock.MagicMock()
|
||||
_client = StdDaqClient(
|
||||
parent=parent_device, ws_url="ws://localhost:5001", rest_url="http://localhost:5000"
|
||||
)
|
||||
yield _client
|
||||
_client.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def full_config():
|
||||
full_config = dict(
|
||||
detector_name="tomcat-gf",
|
||||
detector_type="gigafrost",
|
||||
n_modules=8,
|
||||
bit_depth=16,
|
||||
image_pixel_height=2016,
|
||||
image_pixel_width=2016,
|
||||
start_udp_port=2000,
|
||||
writer_user_id=18600,
|
||||
max_number_of_forwarders_spawned=8,
|
||||
use_all_forwarders=True,
|
||||
module_sync_queue_size=4096,
|
||||
number_of_writers=12,
|
||||
module_positions={},
|
||||
ram_buffer_gb=150,
|
||||
delay_filter_timeout=10,
|
||||
live_stream_configs={
|
||||
"tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
|
||||
"tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
|
||||
"tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
|
||||
},
|
||||
)
|
||||
return full_config
|
||||
|
||||
|
||||
def test_stddaq_client(client):
|
||||
assert client is not None
|
||||
|
||||
|
||||
def test_stddaq_client_get_daq_config(client, full_config):
|
||||
with requests_mock.Mocker() as m:
|
||||
response = full_config
|
||||
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
|
||||
out = client.get_config()
|
||||
|
||||
# Check that the response is simply the json response
|
||||
assert out == response.model_dump()
|
||||
|
||||
assert client._config == response
|
||||
|
||||
|
||||
def test_stddaq_client_set_config_pydantic(client, full_config):
|
||||
"""Test setting configurations through the StdDAQ client"""
|
||||
with requests_mock.Mocker() as m:
|
||||
m.post("http://localhost:5000/api/config/set?user=ioc")
|
||||
|
||||
# Test with StdDaqConfig object
|
||||
config = full_config
|
||||
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
client.set_config(config)
|
||||
|
||||
# Verify the last request
|
||||
assert m.last_request.json() == full_config.model_dump()
|
||||
|
||||
|
||||
def test_std_daq_client_set_config_dict(client, full_config):
|
||||
"""
|
||||
Test setting configurations through the StdDAQ client with a dictionary input.
|
||||
"""
|
||||
# import json
|
||||
# from unittest import mock
|
||||
|
||||
# import pytest
|
||||
# import requests
|
||||
# import requests_mock
|
||||
# import typeguard
|
||||
# from ophyd import StatusBase
|
||||
# from websockets import WebSocketException
|
||||
|
||||
# from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqError, StdDaqStatus
|
||||
|
||||
|
||||
# @pytest.fixture
|
||||
# def client():
|
||||
# parent_device = mock.MagicMock()
|
||||
# _client = StdDaqClient(
|
||||
# parent=parent_device, ws_url="ws://localhost:5001", rest_url="http://localhost:5000"
|
||||
# )
|
||||
# yield _client
|
||||
# _client.shutdown()
|
||||
|
||||
|
||||
# @pytest.fixture
|
||||
# def full_config():
|
||||
# full_config = dict(
|
||||
# detector_name="tomcat-gf",
|
||||
# detector_type="gigafrost",
|
||||
# n_modules=8,
|
||||
# bit_depth=16,
|
||||
# image_pixel_height=2016,
|
||||
# image_pixel_width=2016,
|
||||
# start_udp_port=2000,
|
||||
# writer_user_id=18600,
|
||||
# max_number_of_forwarders_spawned=8,
|
||||
# use_all_forwarders=True,
|
||||
# module_sync_queue_size=4096,
|
||||
# number_of_writers=12,
|
||||
# module_positions={},
|
||||
# ram_buffer_gb=150,
|
||||
# delay_filter_timeout=10,
|
||||
# live_stream_configs={
|
||||
# "tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
|
||||
# "tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
|
||||
# "tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
|
||||
# },
|
||||
# )
|
||||
# return full_config
|
||||
|
||||
|
||||
# def test_stddaq_client(client):
|
||||
# assert client is not None
|
||||
|
||||
|
||||
# def test_stddaq_client_get_daq_config(client, full_config):
|
||||
# with requests_mock.Mocker() as m:
|
||||
# response = full_config
|
||||
# m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
|
||||
# out = client.get_config()
|
||||
|
||||
# # Check that the response is simply the json response
|
||||
# assert out == response.model_dump()
|
||||
|
||||
# assert client._config == response
|
||||
|
||||
|
||||
# def test_stddaq_client_set_config_pydantic(client, full_config):
|
||||
# """Test setting configurations through the StdDAQ client"""
|
||||
# with requests_mock.Mocker() as m:
|
||||
# m.post("http://localhost:5000/api/config/set?user=ioc")
|
||||
|
||||
# # Test with StdDaqConfig object
|
||||
# config = full_config
|
||||
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
# client.set_config(config)
|
||||
|
||||
# # Verify the last request
|
||||
# assert m.last_request.json() == full_config.model_dump()
|
||||
|
||||
|
||||
# def test_std_daq_client_set_config_dict(client, full_config):
|
||||
# """
|
||||
# Test setting configurations through the StdDAQ client with a dictionary input.
|
||||
# """
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.post("http://localhost:5000/api/config/set?user=ioc")
|
||||
|
||||
# Test with dictionary input
|
||||
config_dict = full_config.model_dump()
|
||||
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
client.set_config(config_dict)
|
||||
assert m.last_request.json() == full_config.model_dump()
|
||||
# with requests_mock.Mocker() as m:
|
||||
# m.post("http://localhost:5000/api/config/set?user=ioc")
|
||||
|
||||
# # Test with dictionary input
|
||||
# config_dict = full_config.model_dump()
|
||||
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
# client.set_config(config_dict)
|
||||
# assert m.last_request.json() == full_config.model_dump()
|
||||
|
||||
|
||||
def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
|
||||
"""
|
||||
Test that the set_config method ignores extra keys in the input dictionary.
|
||||
"""
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.post("http://localhost:5000/api/config/set?user=ioc")
|
||||
# def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
|
||||
# """
|
||||
# Test that the set_config method ignores extra keys in the input dictionary.
|
||||
# """
|
||||
|
||||
# with requests_mock.Mocker() as m:
|
||||
# m.post("http://localhost:5000/api/config/set?user=ioc")
|
||||
|
||||
# Test with dictionary input
|
||||
config_dict = full_config.model_dump()
|
||||
config_dict["extra_key"] = "extra_value"
|
||||
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
client.set_config(config_dict)
|
||||
assert m.last_request.json() == full_config.model_dump()
|
||||
# # Test with dictionary input
|
||||
# config_dict = full_config.model_dump()
|
||||
# config_dict["extra_key"] = "extra_value"
|
||||
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
# client.set_config(config_dict)
|
||||
# assert m.last_request.json() == full_config.model_dump()
|
||||
|
||||
|
||||
def test_stddaq_client_set_config_error(client, full_config):
|
||||
"""
|
||||
Test error handling in the set_config method.
|
||||
"""
|
||||
with requests_mock.Mocker() as m:
|
||||
config = full_config
|
||||
m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
|
||||
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
client.set_config(config)
|
||||
# def test_stddaq_client_set_config_error(client, full_config):
|
||||
# """
|
||||
# Test error handling in the set_config method.
|
||||
# """
|
||||
# with requests_mock.Mocker() as m:
|
||||
# config = full_config
|
||||
# m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
|
||||
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
||||
# with pytest.raises(requests.exceptions.HTTPError):
|
||||
# client.set_config(config)
|
||||
|
||||
|
||||
def test_stddaq_client_get_config_cached(client, full_config):
|
||||
"""
|
||||
Test that the client returns the cached configuration if it is available.
|
||||
"""
|
||||
# def test_stddaq_client_get_config_cached(client, full_config):
|
||||
# """
|
||||
# Test that the client returns the cached configuration if it is available.
|
||||
# """
|
||||
|
||||
# Set the cached configuration
|
||||
config = full_config
|
||||
client._config = config
|
||||
# # Set the cached configuration
|
||||
# config = full_config
|
||||
# client._config = config
|
||||
|
||||
# Test that the client returns the cached configuration
|
||||
assert client.get_config(cached=True) == config
|
||||
# # Test that the client returns the cached configuration
|
||||
# assert client.get_config(cached=True) == config
|
||||
|
||||
|
||||
def test_stddaq_client_status(client):
|
||||
client._status = StdDaqStatus.FILE_CREATED
|
||||
assert client.status == StdDaqStatus.FILE_CREATED
|
||||
# def test_stddaq_client_status(client):
|
||||
# client._status = StdDaqStatus.FILE_CREATED
|
||||
# assert client.status == StdDaqStatus.FILE_CREATED
|
||||
|
||||
|
||||
def test_stddaq_client_start(client):
|
||||
# def test_stddaq_client_start(client):
|
||||
|
||||
with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase:
|
||||
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
|
||||
out = client._send_queue.get()
|
||||
assert out == {
|
||||
"command": "start",
|
||||
"path": "test_file_path",
|
||||
"file_prefix": "test_file_prefix",
|
||||
"n_image": 10,
|
||||
}
|
||||
StatusBase().wait.assert_called_once()
|
||||
# with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase:
|
||||
# client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
|
||||
# out = client._send_queue.get()
|
||||
# assert out == {
|
||||
# "command": "start",
|
||||
# "path": "test_file_path",
|
||||
# "file_prefix": "test_file_prefix",
|
||||
# "n_image": 10,
|
||||
# }
|
||||
# StatusBase().wait.assert_called_once()
|
||||
|
||||
|
||||
def test_stddaq_client_start_type_error(client):
|
||||
with pytest.raises(typeguard.TypeCheckError):
|
||||
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
|
||||
# def test_stddaq_client_start_type_error(client):
|
||||
# with pytest.raises(typeguard.TypeCheckError):
|
||||
# client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
|
||||
|
||||
|
||||
def test_stddaq_client_stop(client):
|
||||
"""
|
||||
Check that the stop method puts the stop command in the send queue.
|
||||
"""
|
||||
client.stop()
|
||||
client._send_queue.get() == {"command": "stop"}
|
||||
# def test_stddaq_client_stop(client):
|
||||
# """
|
||||
# Check that the stop method puts the stop command in the send queue.
|
||||
# """
|
||||
# client.stop()
|
||||
# client._send_queue.get() == {"command": "stop"}
|
||||
|
||||
|
||||
def test_stddaq_client_update_config(client, full_config):
|
||||
"""
|
||||
Test that the update_config method updates the configuration with the provided dictionary.
|
||||
"""
|
||||
# def test_stddaq_client_update_config(client, full_config):
|
||||
# """
|
||||
# Test that the update_config method updates the configuration with the provided dictionary.
|
||||
# """
|
||||
|
||||
config = full_config
|
||||
with requests_mock.Mocker() as m:
|
||||
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
||||
# config = full_config
|
||||
# with requests_mock.Mocker() as m:
|
||||
# m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
||||
|
||||
# Update the configuration
|
||||
update_dict = {"detector_name": "new_name"}
|
||||
with mock.patch.object(client, "set_config") as set_config:
|
||||
client.update_config(update_dict)
|
||||
# # Update the configuration
|
||||
# update_dict = {"detector_name": "new_name"}
|
||||
# with mock.patch.object(client, "set_config") as set_config:
|
||||
# client.update_config(update_dict)
|
||||
|
||||
assert set_config.call_count == 1
|
||||
# assert set_config.call_count == 1
|
||||
|
||||
|
||||
def test_stddaq_client_updates_only_changed_configs(client, full_config):
|
||||
"""
|
||||
Test that the update_config method only updates the configuration if the config has changed.
|
||||
"""
|
||||
|
||||
config = full_config
|
||||
with requests_mock.Mocker() as m:
|
||||
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
||||
|
||||
# Update the configuration
|
||||
update_dict = {"detector_name": "tomcat-gf"}
|
||||
with mock.patch.object(client, "set_config") as set_config:
|
||||
client.update_config(update_dict)
|
||||
|
||||
assert set_config.call_count == 0
|
||||
|
||||
|
||||
def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
|
||||
"""
|
||||
Test that the update_config method only updates the configuration if the config has changed.
|
||||
"""
|
||||
|
||||
config = full_config
|
||||
with requests_mock.Mocker() as m:
|
||||
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
||||
|
||||
# Update the configuration
|
||||
update_dict = {}
|
||||
with mock.patch.object(client, "set_config") as set_config:
|
||||
client.update_config(update_dict)
|
||||
|
||||
assert set_config.call_count == 0
|
||||
|
||||
|
||||
def test_stddaq_client_pre_restart(client):
|
||||
"""
|
||||
Test that the pre_restart method sets the status to RESTARTING.
|
||||
"""
|
||||
# let's assume the websocket loop is already idle
|
||||
client._ws_idle_event.set()
|
||||
client.ws_client = mock.MagicMock()
|
||||
client._pre_restart()
|
||||
client.ws_client.close.assert_called_once()
|
||||
|
||||
|
||||
def test_stddaq_client_post_restart(client):
|
||||
"""
|
||||
Test that the post_restart method sets the status to IDLE.
|
||||
"""
|
||||
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
||||
client._post_restart()
|
||||
wait_for_connection.assert_called_once()
|
||||
assert client._daq_is_running.is_set()
|
||||
|
||||
|
||||
def test_stddaq_client_reset(client):
|
||||
"""
|
||||
Test that the reset method calls get_config and set_config.
|
||||
"""
|
||||
with (
|
||||
mock.patch.object(client, "get_config") as get_config,
|
||||
mock.patch.object(client, "set_config") as set_config,
|
||||
):
|
||||
client.reset()
|
||||
get_config.assert_called_once()
|
||||
set_config.assert_called_once()
|
||||
|
||||
|
||||
def test_stddaq_client_run_status_callbacks(client):
|
||||
"""
|
||||
Test that the run_status_callback method runs the status callback.
|
||||
"""
|
||||
status = StatusBase()
|
||||
client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
|
||||
client._status = StdDaqStatus.FILE_CREATED
|
||||
client._run_status_callbacks()
|
||||
status.wait()
|
||||
|
||||
assert len(status._callbacks) == 0
|
||||
|
||||
|
||||
def test_stddaq_client_run_status_callbacks_error(client):
|
||||
"""
|
||||
Test that the run_status_callback method runs the status callback.
|
||||
"""
|
||||
status = StatusBase()
|
||||
client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
|
||||
client._status = StdDaqStatus.FILE_CREATED
|
||||
client._run_status_callbacks()
|
||||
with pytest.raises(StdDaqError):
|
||||
status.wait()
|
||||
|
||||
assert len(status._callbacks) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"msg, updated",
|
||||
[({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
|
||||
)
|
||||
def test_stddaq_client_on_received_ws_message(client, msg, updated):
|
||||
"""
|
||||
Test that the on_received_ws_message method runs the status callback.
|
||||
"""
|
||||
client._status = None
|
||||
with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
|
||||
client._on_received_ws_message(msg)
|
||||
if updated:
|
||||
run_status_callbacks.assert_called_once()
|
||||
assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
|
||||
else:
|
||||
run_status_callbacks.assert_not_called()
|
||||
assert client._status is None
|
||||
|
||||
|
||||
def test_stddaq_client_ws_send_and_receive(client):
|
||||
|
||||
client.ws_client = mock.MagicMock()
|
||||
client._send_queue.put({"command": "test"})
|
||||
client._ws_send_and_receive()
|
||||
# queue is not empty, so we should send the message
|
||||
client.ws_client.send.assert_called_once()
|
||||
client.ws_client.recv.assert_called_once()
|
||||
|
||||
client.ws_client.reset_mock()
|
||||
client._ws_send_and_receive()
|
||||
# queue is empty, so we should not send the message
|
||||
client.ws_client.send.assert_not_called()
|
||||
client.ws_client.recv.assert_called_once()
|
||||
|
||||
|
||||
def test_stddaq_client_ws_send_and_receive_websocket_error(client):
|
||||
"""
|
||||
Test that the ws_send_and_receive method handles websocket errors.
|
||||
"""
|
||||
client.ws_client = mock.MagicMock()
|
||||
client.ws_client.send.side_effect = WebSocketException()
|
||||
client._send_queue.put({"command": "test"})
|
||||
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
||||
client._ws_send_and_receive()
|
||||
wait_for_connection.assert_called_once()
|
||||
|
||||
|
||||
def test_stddaq_client_ws_send_and_receive_timeout_error(client):
|
||||
"""
|
||||
Test that the ws_send_and_receive method handles timeout errors.
|
||||
"""
|
||||
client.ws_client = mock.MagicMock()
|
||||
client.ws_client.recv.side_effect = TimeoutError()
|
||||
client._send_queue.put({"command": "test"})
|
||||
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
||||
client._ws_send_and_receive()
|
||||
wait_for_connection.assert_not_called()
|
||||
|
||||
|
||||
def test_stddaq_client_ws_update_loop(client):
|
||||
"""
|
||||
Test that the ws_update_loop method runs the status callback.
|
||||
"""
|
||||
client._shutdown_event = mock.MagicMock()
|
||||
client._shutdown_event.is_set.side_effect = [False, True]
|
||||
with (
|
||||
mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
|
||||
mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
|
||||
):
|
||||
client._ws_update_loop()
|
||||
|
||||
ws_send_and_receive.assert_called_once()
|
||||
wait_for_server_running.assert_called_once()
|
||||
# def test_stddaq_client_updates_only_changed_configs(client, full_config):
|
||||
# """
|
||||
# Test that the update_config method only updates the configuration if the config has changed.
|
||||
# """
|
||||
|
||||
# config = full_config
|
||||
# with requests_mock.Mocker() as m:
|
||||
# m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
||||
|
||||
# # Update the configuration
|
||||
# update_dict = {"detector_name": "tomcat-gf"}
|
||||
# with mock.patch.object(client, "set_config") as set_config:
|
||||
# client.update_config(update_dict)
|
||||
|
||||
# assert set_config.call_count == 0
|
||||
|
||||
|
||||
# def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
|
||||
# """
|
||||
# Test that the update_config method only updates the configuration if the config has changed.
|
||||
# """
|
||||
|
||||
# config = full_config
|
||||
# with requests_mock.Mocker() as m:
|
||||
# m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
||||
|
||||
# # Update the configuration
|
||||
# update_dict = {}
|
||||
# with mock.patch.object(client, "set_config") as set_config:
|
||||
# client.update_config(update_dict)
|
||||
|
||||
# assert set_config.call_count == 0
|
||||
|
||||
|
||||
# def test_stddaq_client_pre_restart(client):
|
||||
# """
|
||||
# Test that the pre_restart method sets the status to RESTARTING.
|
||||
# """
|
||||
# # let's assume the websocket loop is already idle
|
||||
# client._ws_idle_event.set()
|
||||
# client.ws_client = mock.MagicMock()
|
||||
# client._pre_restart()
|
||||
# client.ws_client.close.assert_called_once()
|
||||
|
||||
|
||||
# def test_stddaq_client_post_restart(client):
|
||||
# """
|
||||
# Test that the post_restart method sets the status to IDLE.
|
||||
# """
|
||||
# with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
||||
# client._post_restart()
|
||||
# wait_for_connection.assert_called_once()
|
||||
# assert client._daq_is_running.is_set()
|
||||
|
||||
|
||||
# def test_stddaq_client_reset(client):
|
||||
# """
|
||||
# Test that the reset method calls get_config and set_config.
|
||||
# """
|
||||
# with (
|
||||
# mock.patch.object(client, "get_config") as get_config,
|
||||
# mock.patch.object(client, "set_config") as set_config,
|
||||
# ):
|
||||
# client.reset()
|
||||
# get_config.assert_called_once()
|
||||
# set_config.assert_called_once()
|
||||
|
||||
|
||||
# def test_stddaq_client_run_status_callbacks(client):
|
||||
# """
|
||||
# Test that the run_status_callback method runs the status callback.
|
||||
# """
|
||||
# status = StatusBase()
|
||||
# client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
|
||||
# client._status = StdDaqStatus.FILE_CREATED
|
||||
# client._run_status_callbacks()
|
||||
# status.wait()
|
||||
|
||||
# assert len(status._callbacks) == 0
|
||||
|
||||
|
||||
# def test_stddaq_client_run_status_callbacks_error(client):
|
||||
# """
|
||||
# Test that the run_status_callback method runs the status callback.
|
||||
# """
|
||||
# status = StatusBase()
|
||||
# client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
|
||||
# client._status = StdDaqStatus.FILE_CREATED
|
||||
# client._run_status_callbacks()
|
||||
# with pytest.raises(StdDaqError):
|
||||
# status.wait()
|
||||
|
||||
# assert len(status._callbacks) == 0
|
||||
|
||||
|
||||
# @pytest.mark.parametrize(
|
||||
# "msg, updated",
|
||||
# [({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
|
||||
# )
|
||||
# def test_stddaq_client_on_received_ws_message(client, msg, updated):
|
||||
# """
|
||||
# Test that the on_received_ws_message method runs the status callback.
|
||||
# """
|
||||
# client._status = None
|
||||
# with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
|
||||
# client._on_received_ws_message(msg)
|
||||
# if updated:
|
||||
# run_status_callbacks.assert_called_once()
|
||||
# assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
|
||||
# else:
|
||||
# run_status_callbacks.assert_not_called()
|
||||
# assert client._status is None
|
||||
|
||||
|
||||
# def test_stddaq_client_ws_send_and_receive(client):
|
||||
|
||||
# client.ws_client = mock.MagicMock()
|
||||
# client._send_queue.put({"command": "test"})
|
||||
# client._ws_send_and_receive()
|
||||
# # queue is not empty, so we should send the message
|
||||
# client.ws_client.send.assert_called_once()
|
||||
# client.ws_client.recv.assert_called_once()
|
||||
|
||||
# client.ws_client.reset_mock()
|
||||
# client._ws_send_and_receive()
|
||||
# # queue is empty, so we should not send the message
|
||||
# client.ws_client.send.assert_not_called()
|
||||
# client.ws_client.recv.assert_called_once()
|
||||
|
||||
|
||||
# def test_stddaq_client_ws_send_and_receive_websocket_error(client):
|
||||
# """
|
||||
# Test that the ws_send_and_receive method handles websocket errors.
|
||||
# """
|
||||
# client.ws_client = mock.MagicMock()
|
||||
# client.ws_client.send.side_effect = WebSocketException()
|
||||
# client._send_queue.put({"command": "test"})
|
||||
# with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
||||
# client._ws_send_and_receive()
|
||||
# wait_for_connection.assert_called_once()
|
||||
|
||||
|
||||
# def test_stddaq_client_ws_send_and_receive_timeout_error(client):
|
||||
# """
|
||||
# Test that the ws_send_and_receive method handles timeout errors.
|
||||
# """
|
||||
# client.ws_client = mock.MagicMock()
|
||||
# client.ws_client.recv.side_effect = TimeoutError()
|
||||
# client._send_queue.put({"command": "test"})
|
||||
# with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
||||
# client._ws_send_and_receive()
|
||||
# wait_for_connection.assert_not_called()
|
||||
|
||||
|
||||
# def test_stddaq_client_ws_update_loop(client):
|
||||
# """
|
||||
# Test that the ws_update_loop method runs the status callback.
|
||||
# """
|
||||
# client._shutdown_event = mock.MagicMock()
|
||||
# client._shutdown_event.is_set.side_effect = [False, True]
|
||||
# with (
|
||||
# mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
|
||||
# mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
|
||||
# ):
|
||||
# client._ws_update_loop()
|
||||
|
||||
# ws_send_and_receive.assert_called_once()
|
||||
# wait_for_server_running.assert_called_once()
|
||||
|
||||
@@ -11,19 +11,6 @@ eyex:
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
eyey:
|
||||
readoutPriority: baseline
|
||||
description: X-ray eye axis Y
|
||||
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
|
||||
deviceConfig:
|
||||
prefix: MTEST-X05LA-ES2-XRAYEYE:M2
|
||||
deviceTags:
|
||||
- xray-eye
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
eyez:
|
||||
readoutPriority: baseline
|
||||
description: X-ray eye axis Z
|
||||
@@ -51,18 +38,31 @@ femto_mean_curr:
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
|
||||
# es1_roty:
|
||||
# readoutPriority: monitored
|
||||
# description: 'Test rotation stage'
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: X02DA-ES1-SMP1:ROTY
|
||||
# deviceTags:
|
||||
# - es1-sam
|
||||
# onFailure: buffer
|
||||
# enabled: true
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
es1_roty:
|
||||
readoutPriority: monitored
|
||||
description: 'Test rotation stage'
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X02DA-ES1-SMP1:ROTY
|
||||
deviceTags:
|
||||
- es1-sam
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
es1_trx:
|
||||
readoutPriority: monitored
|
||||
description: 'Test translation stage'
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X02DA-ES1-SMP1:TRX
|
||||
deviceTags:
|
||||
- es1-sam
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
es1_ismc:
|
||||
description: 'Automation1 iSMC interface'
|
||||
|
||||
@@ -197,7 +197,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
|
||||
```
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["configure", "prepare", "toggle"]
|
||||
USER_ACCESS = ["configure", "fire", "toggle", "arm", "disarm"]
|
||||
_distance_value = None
|
||||
|
||||
# ########################################################################
|
||||
|
||||
Reference in New Issue
Block a user