test: added more file_manager tests

This commit is contained in:
wakonig_k 2023-09-02 18:49:54 +02:00
parent 5eda477723
commit 153c38aea6

View File

@ -2,6 +2,7 @@ import os
from unittest import mock
import bec_lib.core
import numpy as np
import pytest
import yaml
from bec_lib.core import BECMessage, DeviceManagerBase, MessageEndpoints, ServiceConfig
@ -140,3 +141,48 @@ def test_scan_storage_ready_to_write():
storage.scan_finished = True
storage.append(1, {"data": "data"})
assert storage.ready_to_write() is True
def test_update_file_references():
file_manager = load_FileWriter()
with mock.patch.object(file_manager, "producer") as mock_producer:
file_manager.update_file_references("scanID")
mock_producer.keys.assert_not_called()
def test_update_file_references_gets_keys():
file_manager = load_FileWriter()
file_manager.scan_storage["scanID"] = ScanStorage(10, "scanID")
with mock.patch.object(file_manager, "producer") as mock_producer:
file_manager.update_file_references("scanID")
mock_producer.keys.assert_called_once_with(MessageEndpoints.public_file("scanID", "*"))
def test_update_async_data():
file_manager = load_FileWriter()
file_manager.scan_storage["scanID"] = ScanStorage(10, "scanID")
with mock.patch.object(file_manager, "producer") as mock_producer:
with mock.patch.object(file_manager, "_process_async_data") as mock_process:
key = f"{MessageEndpoints.device_async_readback('scanID', 'dev1')}:stream"
mock_producer.keys.return_value = [
key.encode(),
]
data = [
(b"0-0", b'{"data": "data"}'),
]
mock_producer.xrange.return_value = data
file_manager.update_async_data("scanID")
mock_producer.xrange.assert_called_once_with(key, min="-", max="+")
mock_process.assert_called_once_with(data, "scanID", "dev1")
def test_process_async_data_single_entry():
file_manager = load_FileWriter()
file_manager.scan_storage["scanID"] = ScanStorage(10, "scanID")
data = [
(b"0-0", {b"data": BECMessage.DeviceMessage(signals={"data": np.zeros((10, 10))}).dumps()}),
]
file_manager._process_async_data(data, "scanID", "dev1")
assert np.isclose(
file_manager.scan_storage["scanID"].async_data["dev1"]["data"], np.zeros((10, 10))
).all()