mirror of
https://github.com/ivan-usov-org/bec.git
synced 2025-04-21 18:20:01 +02:00
118 lines
5.0 KiB
Python
118 lines
5.0 KiB
Python
from unittest import mock
|
|
|
|
from bec_lib.core import BECMessage, MessageEndpoints
|
|
|
|
from data_processing.worker_manager import DAPWorkerManager
|
|
|
|
|
|
def test_worker_manager_retrieves_config_on_startup():
|
|
connector = mock.MagicMock()
|
|
with mock.patch.object(DAPWorkerManager, "update_config") as mock_update_config:
|
|
config = {
|
|
"stream": "scan_segment",
|
|
"output": "gaussian_fit_worker_3",
|
|
"input_xy": ["samx.samx.value", "gauss_bpm.gauss_bpm.value"],
|
|
"model": "GaussianModel",
|
|
}
|
|
worker_config = {"id": "gaussian_fit_worker_3", "config": config}
|
|
connector.producer().get.return_value = BECMessage.DAPConfigMessage(
|
|
config={"workers": [worker_config]}
|
|
).dumps()
|
|
worker_manager = DAPWorkerManager(connector)
|
|
mock_update_config.assert_called_once()
|
|
|
|
|
|
def test_worker_manager_retrieves_config_on_startup_empty():
|
|
connector = mock.MagicMock()
|
|
with mock.patch.object(DAPWorkerManager, "update_config") as mock_update_config:
|
|
connector.producer().get.return_value = None
|
|
worker_manager = DAPWorkerManager(connector)
|
|
mock_update_config.assert_not_called()
|
|
|
|
|
|
def test_worker_manager_update_config():
|
|
connector = mock.MagicMock()
|
|
with mock.patch.object(DAPWorkerManager, "_start_worker") as mock_start_worker:
|
|
connector.producer().get.return_value = None
|
|
worker_manager = DAPWorkerManager(connector)
|
|
config = {
|
|
"stream": "scan_segment",
|
|
"output": "gaussian_fit_worker_3",
|
|
"input_xy": ["samx.samx.value", "gauss_bpm.gauss_bpm.value"],
|
|
"model": "GaussianModel",
|
|
}
|
|
worker_config = {"id": "gaussian_fit_worker_3", "config": config}
|
|
worker_manager.update_config(
|
|
BECMessage.DAPConfigMessage(config={"workers": [worker_config]})
|
|
)
|
|
mock_start_worker.assert_called_once()
|
|
|
|
|
|
def test_worker_manager_update_config_no_workers():
|
|
connector = mock.MagicMock()
|
|
with mock.patch.object(DAPWorkerManager, "_start_worker") as mock_start_worker:
|
|
connector.producer().get.return_value = None
|
|
worker_manager = DAPWorkerManager(connector)
|
|
worker_manager.update_config(BECMessage.DAPConfigMessage(config={"workers": []}))
|
|
mock_start_worker.assert_not_called()
|
|
|
|
|
|
def test_worker_manager_update_config_worker_already_running():
|
|
connector = mock.MagicMock()
|
|
with mock.patch.object(DAPWorkerManager, "_start_worker") as mock_start_worker:
|
|
connector.producer().get.return_value = None
|
|
worker_manager = DAPWorkerManager(connector)
|
|
config = {
|
|
"stream": "scan_segment",
|
|
"output": "gaussian_fit_worker_3",
|
|
"input_xy": ["samx.samx.value", "gauss_bpm.gauss_bpm.value"],
|
|
"model": "GaussianModel",
|
|
}
|
|
worker_config = {"id": "gaussian_fit_worker_3", "config": config}
|
|
worker_manager._workers = {"gaussian_fit_worker_3": {"config": config, "worker": None}}
|
|
worker_manager.update_config(
|
|
BECMessage.DAPConfigMessage(config={"workers": [worker_config]})
|
|
)
|
|
mock_start_worker.assert_not_called()
|
|
|
|
|
|
def test_worker_manager_update_config_worker_already_running_different_config():
|
|
connector = mock.MagicMock()
|
|
with mock.patch.object(DAPWorkerManager, "_start_worker") as mock_start_worker:
|
|
connector.producer().get.return_value = None
|
|
worker_manager = DAPWorkerManager(connector)
|
|
config = {
|
|
"stream": "scan_segment",
|
|
"output": "gaussian_fit_worker_3",
|
|
"input_xy": ["samx.samx.value", "gauss_bpm.gauss_bpm.value"],
|
|
"model": "GaussianModel",
|
|
}
|
|
w3_mock = mock.MagicMock()
|
|
worker_config = {"id": "gaussian_fit_worker_3", "config": config}
|
|
worker_manager._workers = {"gaussian_fit_worker_3": {"config": {}, "worker": w3_mock}}
|
|
worker_manager.update_config(
|
|
BECMessage.DAPConfigMessage(config={"workers": [worker_config]})
|
|
)
|
|
mock_start_worker.assert_called_once()
|
|
w3_mock.terminate.assert_called_once()
|
|
|
|
|
|
def test_worker_manager_update_config_remove_outdated_workers():
|
|
connector = mock.MagicMock()
|
|
with mock.patch.object(DAPWorkerManager, "_start_worker") as mock_start_worker:
|
|
connector.producer().get.return_value = None
|
|
worker_manager = DAPWorkerManager(connector)
|
|
config = {
|
|
"stream": "scan_segment",
|
|
"output": "gaussian_fit_worker_3",
|
|
"input_xy": ["samx.samx.value", "gauss_bpm.gauss_bpm.value"],
|
|
"model": "GaussianModel",
|
|
}
|
|
w3_mock = mock.MagicMock()
|
|
worker_config = {"id": "gaussian_fit_worker_3", "config": config}
|
|
worker_manager._workers = {"gaussian_fit_worker_3": {"config": {}, "worker": w3_mock}}
|
|
worker_manager.update_config(BECMessage.DAPConfigMessage(config={"workers": []}))
|
|
mock_start_worker.assert_not_called()
|
|
w3_mock.terminate.assert_called_once()
|
|
assert worker_manager._workers == {}
|