This commit is contained in:
gac-x05la
2025-04-16 15:23:25 +02:00
parent 4437bb13b8
commit f3961322e3
3 changed files with 353 additions and 353 deletions

View File

@@ -1,353 +1,353 @@
import json # import json
from unittest import mock # from unittest import mock
import pytest # import pytest
import requests # import requests
import requests_mock # import requests_mock
import typeguard # import typeguard
from ophyd import StatusBase # from ophyd import StatusBase
from websockets import WebSocketException # from websockets import WebSocketException
from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqError, StdDaqStatus # from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqError, StdDaqStatus
@pytest.fixture # @pytest.fixture
def client(): # def client():
parent_device = mock.MagicMock() # parent_device = mock.MagicMock()
_client = StdDaqClient( # _client = StdDaqClient(
parent=parent_device, ws_url="ws://localhost:5001", rest_url="http://localhost:5000" # parent=parent_device, ws_url="ws://localhost:5001", rest_url="http://localhost:5000"
) # )
yield _client # yield _client
_client.shutdown() # _client.shutdown()
@pytest.fixture # @pytest.fixture
def full_config(): # def full_config():
full_config = dict( # full_config = dict(
detector_name="tomcat-gf", # detector_name="tomcat-gf",
detector_type="gigafrost", # detector_type="gigafrost",
n_modules=8, # n_modules=8,
bit_depth=16, # bit_depth=16,
image_pixel_height=2016, # image_pixel_height=2016,
image_pixel_width=2016, # image_pixel_width=2016,
start_udp_port=2000, # start_udp_port=2000,
writer_user_id=18600, # writer_user_id=18600,
max_number_of_forwarders_spawned=8, # max_number_of_forwarders_spawned=8,
use_all_forwarders=True, # use_all_forwarders=True,
module_sync_queue_size=4096, # module_sync_queue_size=4096,
number_of_writers=12, # number_of_writers=12,
module_positions={}, # module_positions={},
ram_buffer_gb=150, # ram_buffer_gb=150,
delay_filter_timeout=10, # delay_filter_timeout=10,
live_stream_configs={ # live_stream_configs={
"tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]}, # "tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]}, # "tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]}, # "tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
}, # },
) # )
return full_config # return full_config
def test_stddaq_client(client): # def test_stddaq_client(client):
assert client is not None # assert client is not None
def test_stddaq_client_get_daq_config(client, full_config): # def test_stddaq_client_get_daq_config(client, full_config):
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
response = full_config # response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump()) # m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
out = client.get_config() # out = client.get_config()
# Check that the response is simply the json response # # Check that the response is simply the json response
assert out == response.model_dump() # assert out == response.model_dump()
assert client._config == response # assert client._config == response
def test_stddaq_client_set_config_pydantic(client, full_config): # def test_stddaq_client_set_config_pydantic(client, full_config):
"""Test setting configurations through the StdDAQ client""" # """Test setting configurations through the StdDAQ client"""
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc") # m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with StdDaqConfig object # # Test with StdDaqConfig object
config = full_config # config = full_config
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"): # with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config) # client.set_config(config)
# Verify the last request # # Verify the last request
assert m.last_request.json() == full_config.model_dump() # assert m.last_request.json() == full_config.model_dump()
def test_std_daq_client_set_config_dict(client, full_config): # def test_std_daq_client_set_config_dict(client, full_config):
""" # """
Test setting configurations through the StdDAQ client with a dictionary input. # Test setting configurations through the StdDAQ client with a dictionary input.
""" # """
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc") # m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input # # Test with dictionary input
config_dict = full_config.model_dump() # config_dict = full_config.model_dump()
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"): # with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict) # client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump() # assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_ignores_extra_keys(client, full_config): # def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
""" # """
Test that the set_config method ignores extra keys in the input dictionary. # Test that the set_config method ignores extra keys in the input dictionary.
""" # """
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc") # m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input # # Test with dictionary input
config_dict = full_config.model_dump() # config_dict = full_config.model_dump()
config_dict["extra_key"] = "extra_value" # config_dict["extra_key"] = "extra_value"
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"): # with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict) # client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump() # assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_error(client, full_config): # def test_stddaq_client_set_config_error(client, full_config):
""" # """
Test error handling in the set_config method. # Test error handling in the set_config method.
""" # """
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
config = full_config # config = full_config
m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500) # m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"): # with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
with pytest.raises(requests.exceptions.HTTPError): # with pytest.raises(requests.exceptions.HTTPError):
client.set_config(config) # client.set_config(config)
def test_stddaq_client_get_config_cached(client, full_config): # def test_stddaq_client_get_config_cached(client, full_config):
""" # """
Test that the client returns the cached configuration if it is available. # Test that the client returns the cached configuration if it is available.
""" # """
# Set the cached configuration # # Set the cached configuration
config = full_config # config = full_config
client._config = config # client._config = config
# Test that the client returns the cached configuration # # Test that the client returns the cached configuration
assert client.get_config(cached=True) == config # assert client.get_config(cached=True) == config
def test_stddaq_client_status(client): # def test_stddaq_client_status(client):
client._status = StdDaqStatus.FILE_CREATED # client._status = StdDaqStatus.FILE_CREATED
assert client.status == StdDaqStatus.FILE_CREATED # assert client.status == StdDaqStatus.FILE_CREATED
def test_stddaq_client_start(client): # def test_stddaq_client_start(client):
with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase: # with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase:
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10) # client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
out = client._send_queue.get() # out = client._send_queue.get()
assert out == { # assert out == {
"command": "start", # "command": "start",
"path": "test_file_path", # "path": "test_file_path",
"file_prefix": "test_file_prefix", # "file_prefix": "test_file_prefix",
"n_image": 10, # "n_image": 10,
} # }
StatusBase().wait.assert_called_once() # StatusBase().wait.assert_called_once()
def test_stddaq_client_start_type_error(client): # def test_stddaq_client_start_type_error(client):
with pytest.raises(typeguard.TypeCheckError): # with pytest.raises(typeguard.TypeCheckError):
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10") # client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
def test_stddaq_client_stop(client): # def test_stddaq_client_stop(client):
""" # """
Check that the stop method puts the stop command in the send queue. # Check that the stop method puts the stop command in the send queue.
""" # """
client.stop() # client.stop()
client._send_queue.get() == {"command": "stop"} # client._send_queue.get() == {"command": "stop"}
def test_stddaq_client_update_config(client, full_config): # def test_stddaq_client_update_config(client, full_config):
""" # """
Test that the update_config method updates the configuration with the provided dictionary. # Test that the update_config method updates the configuration with the provided dictionary.
""" # """
config = full_config # config = full_config
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump()) # m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration # # Update the configuration
update_dict = {"detector_name": "new_name"} # update_dict = {"detector_name": "new_name"}
with mock.patch.object(client, "set_config") as set_config: # with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict) # client.update_config(update_dict)
assert set_config.call_count == 1 # assert set_config.call_count == 1
def test_stddaq_client_updates_only_changed_configs(client, full_config): # def test_stddaq_client_updates_only_changed_configs(client, full_config):
""" # """
Test that the update_config method only updates the configuration if the config has changed. # Test that the update_config method only updates the configuration if the config has changed.
""" # """
config = full_config # config = full_config
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump()) # m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration # # Update the configuration
update_dict = {"detector_name": "tomcat-gf"} # update_dict = {"detector_name": "tomcat-gf"}
with mock.patch.object(client, "set_config") as set_config: # with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict) # client.update_config(update_dict)
assert set_config.call_count == 0 # assert set_config.call_count == 0
def test_stddaq_client_updates_only_changed_configs_empty(client, full_config): # def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
""" # """
Test that the update_config method only updates the configuration if the config has changed. # Test that the update_config method only updates the configuration if the config has changed.
""" # """
config = full_config # config = full_config
with requests_mock.Mocker() as m: # with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump()) # m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration # # Update the configuration
update_dict = {} # update_dict = {}
with mock.patch.object(client, "set_config") as set_config: # with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict) # client.update_config(update_dict)
assert set_config.call_count == 0 # assert set_config.call_count == 0
def test_stddaq_client_pre_restart(client): # def test_stddaq_client_pre_restart(client):
""" # """
Test that the pre_restart method sets the status to RESTARTING. # Test that the pre_restart method sets the status to RESTARTING.
""" # """
# let's assume the websocket loop is already idle # # let's assume the websocket loop is already idle
client._ws_idle_event.set() # client._ws_idle_event.set()
client.ws_client = mock.MagicMock() # client.ws_client = mock.MagicMock()
client._pre_restart() # client._pre_restart()
client.ws_client.close.assert_called_once() # client.ws_client.close.assert_called_once()
def test_stddaq_client_post_restart(client): # def test_stddaq_client_post_restart(client):
""" # """
Test that the post_restart method sets the status to IDLE. # Test that the post_restart method sets the status to IDLE.
""" # """
with mock.patch.object(client, "wait_for_connection") as wait_for_connection: # with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._post_restart() # client._post_restart()
wait_for_connection.assert_called_once() # wait_for_connection.assert_called_once()
assert client._daq_is_running.is_set() # assert client._daq_is_running.is_set()
def test_stddaq_client_reset(client): # def test_stddaq_client_reset(client):
""" # """
Test that the reset method calls get_config and set_config. # Test that the reset method calls get_config and set_config.
""" # """
with ( # with (
mock.patch.object(client, "get_config") as get_config, # mock.patch.object(client, "get_config") as get_config,
mock.patch.object(client, "set_config") as set_config, # mock.patch.object(client, "set_config") as set_config,
): # ):
client.reset() # client.reset()
get_config.assert_called_once() # get_config.assert_called_once()
set_config.assert_called_once() # set_config.assert_called_once()
def test_stddaq_client_run_status_callbacks(client): # def test_stddaq_client_run_status_callbacks(client):
""" # """
Test that the run_status_callback method runs the status callback. # Test that the run_status_callback method runs the status callback.
""" # """
status = StatusBase() # status = StatusBase()
client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[]) # client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
client._status = StdDaqStatus.FILE_CREATED # client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks() # client._run_status_callbacks()
status.wait() # status.wait()
assert len(status._callbacks) == 0 # assert len(status._callbacks) == 0
def test_stddaq_client_run_status_callbacks_error(client): # def test_stddaq_client_run_status_callbacks_error(client):
""" # """
Test that the run_status_callback method runs the status callback. # Test that the run_status_callback method runs the status callback.
""" # """
status = StatusBase() # status = StatusBase()
client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED]) # client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
client._status = StdDaqStatus.FILE_CREATED # client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks() # client._run_status_callbacks()
with pytest.raises(StdDaqError): # with pytest.raises(StdDaqError):
status.wait() # status.wait()
assert len(status._callbacks) == 0 # assert len(status._callbacks) == 0
@pytest.mark.parametrize( # @pytest.mark.parametrize(
"msg, updated", # "msg, updated",
[({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)], # [({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
) # )
def test_stddaq_client_on_received_ws_message(client, msg, updated): # def test_stddaq_client_on_received_ws_message(client, msg, updated):
""" # """
Test that the on_received_ws_message method runs the status callback. # Test that the on_received_ws_message method runs the status callback.
""" # """
client._status = None # client._status = None
with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks: # with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
client._on_received_ws_message(msg) # client._on_received_ws_message(msg)
if updated: # if updated:
run_status_callbacks.assert_called_once() # run_status_callbacks.assert_called_once()
assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE # assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
else: # else:
run_status_callbacks.assert_not_called() # run_status_callbacks.assert_not_called()
assert client._status is None # assert client._status is None
def test_stddaq_client_ws_send_and_receive(client): # def test_stddaq_client_ws_send_and_receive(client):
client.ws_client = mock.MagicMock() # client.ws_client = mock.MagicMock()
client._send_queue.put({"command": "test"}) # client._send_queue.put({"command": "test"})
client._ws_send_and_receive() # client._ws_send_and_receive()
# queue is not empty, so we should send the message # # queue is not empty, so we should send the message
client.ws_client.send.assert_called_once() # client.ws_client.send.assert_called_once()
client.ws_client.recv.assert_called_once() # client.ws_client.recv.assert_called_once()
client.ws_client.reset_mock() # client.ws_client.reset_mock()
client._ws_send_and_receive() # client._ws_send_and_receive()
# queue is empty, so we should not send the message # # queue is empty, so we should not send the message
client.ws_client.send.assert_not_called() # client.ws_client.send.assert_not_called()
client.ws_client.recv.assert_called_once() # client.ws_client.recv.assert_called_once()
def test_stddaq_client_ws_send_and_receive_websocket_error(client): # def test_stddaq_client_ws_send_and_receive_websocket_error(client):
""" # """
Test that the ws_send_and_receive method handles websocket errors. # Test that the ws_send_and_receive method handles websocket errors.
""" # """
client.ws_client = mock.MagicMock() # client.ws_client = mock.MagicMock()
client.ws_client.send.side_effect = WebSocketException() # client.ws_client.send.side_effect = WebSocketException()
client._send_queue.put({"command": "test"}) # client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection: # with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive() # client._ws_send_and_receive()
wait_for_connection.assert_called_once() # wait_for_connection.assert_called_once()
def test_stddaq_client_ws_send_and_receive_timeout_error(client): # def test_stddaq_client_ws_send_and_receive_timeout_error(client):
""" # """
Test that the ws_send_and_receive method handles timeout errors. # Test that the ws_send_and_receive method handles timeout errors.
""" # """
client.ws_client = mock.MagicMock() # client.ws_client = mock.MagicMock()
client.ws_client.recv.side_effect = TimeoutError() # client.ws_client.recv.side_effect = TimeoutError()
client._send_queue.put({"command": "test"}) # client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection: # with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive() # client._ws_send_and_receive()
wait_for_connection.assert_not_called() # wait_for_connection.assert_not_called()
def test_stddaq_client_ws_update_loop(client): # def test_stddaq_client_ws_update_loop(client):
""" # """
Test that the ws_update_loop method runs the status callback. # Test that the ws_update_loop method runs the status callback.
""" # """
client._shutdown_event = mock.MagicMock() # client._shutdown_event = mock.MagicMock()
client._shutdown_event.is_set.side_effect = [False, True] # client._shutdown_event.is_set.side_effect = [False, True]
with ( # with (
mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive, # mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running, # mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
): # ):
client._ws_update_loop() # client._ws_update_loop()
ws_send_and_receive.assert_called_once() # ws_send_and_receive.assert_called_once()
wait_for_server_running.assert_called_once() # wait_for_server_running.assert_called_once()

View File

@@ -11,19 +11,6 @@ eyex:
readOnly: false readOnly: false
softwareTrigger: false softwareTrigger: false
eyey:
readoutPriority: baseline
description: X-ray eye axis Y
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
deviceConfig:
prefix: MTEST-X05LA-ES2-XRAYEYE:M2
deviceTags:
- xray-eye
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
eyez: eyez:
readoutPriority: baseline readoutPriority: baseline
description: X-ray eye axis Z description: X-ray eye axis Z
@@ -51,18 +38,31 @@ femto_mean_curr:
readOnly: true readOnly: true
softwareTrigger: false softwareTrigger: false
# es1_roty: es1_roty:
# readoutPriority: monitored readoutPriority: monitored
# description: 'Test rotation stage' description: 'Test rotation stage'
# deviceClass: ophyd.EpicsMotor deviceClass: ophyd.EpicsMotor
# deviceConfig: deviceConfig:
# prefix: X02DA-ES1-SMP1:ROTY prefix: X02DA-ES1-SMP1:ROTY
# deviceTags: deviceTags:
# - es1-sam - es1-sam
# onFailure: buffer onFailure: buffer
# enabled: true enabled: true
# readOnly: false readOnly: false
# softwareTrigger: false softwareTrigger: false
es1_trx:
readoutPriority: monitored
description: 'Test translation stage'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X02DA-ES1-SMP1:TRX
deviceTags:
- es1-sam
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
es1_ismc: es1_ismc:
description: 'Automation1 iSMC interface' description: 'Automation1 iSMC interface'

View File

@@ -197,7 +197,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
``` ```
""" """
USER_ACCESS = ["configure", "prepare", "toggle"] USER_ACCESS = ["configure", "fire", "toggle", "arm", "disarm"]
_distance_value = None _distance_value = None
# ######################################################################## # ########################################################################