293 lines
8.5 KiB
Python
293 lines
8.5 KiB
Python
import patch_put
|
|
import pytest
|
|
import time
|
|
import threading
|
|
import pickle as std_pickle
|
|
from pathlib import Path
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
|
|
from logzero import logger
|
|
from freezegun import freeze_time
|
|
from epics.pv import _PVcache_
|
|
import epics
|
|
from slic.utils.pv import PV
|
|
from slic.utils.hastyepics import get_pv
|
|
from morbidissimo import MorIOC
|
|
from slic.utils.pvpreload import *
|
|
'''
|
|
# IOC simulation
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def run_test_ioc():
|
|
stop = threading.Event()
|
|
|
|
def ioc():
|
|
with MorIOC("TEST") as mor:
|
|
# Declare PVs
|
|
mor.host(PV1=float, PV2=float, PV3=float, PV4=float, PV5=float, PV6=float)
|
|
|
|
# Serve loop until 'stop' is set
|
|
while not stop.is_set():
|
|
mor.serve(
|
|
PV1=1.0,
|
|
PV2=2.0,
|
|
PV3=3.0,
|
|
PV4=4.0,
|
|
PV5=5.0,
|
|
PV6=6.0
|
|
)
|
|
time.sleep(0.1)
|
|
|
|
# Start IOC thread (not daemon so we can join it later)
|
|
t = threading.Thread(target=ioc)
|
|
t.start()
|
|
time.sleep(1) # Give CA time to announce PVs
|
|
|
|
yield # <<< tests in this module run here >>>
|
|
|
|
# --- Teardown: stop IOC and free resources ---
|
|
stop.set()
|
|
t.join(timeout=2.0)
|
|
time.sleep(0.2)
|
|
'''
|
|
def configure_logzero_for_pytest(caplog):
|
|
logger.handlers.clear()
|
|
logger.propagate = True # Propagate to root logger captured by caplog
|
|
caplog.set_level(level=logging.DEBUG)
|
|
|
|
def is_pv_in_cache(name):
|
|
return any(k[0] == name for k in epics.pv._PVcache_)
|
|
|
|
def get_pv_from_cache(pvname):
|
|
for k, pv in _PVcache_.items():
|
|
if hasattr(pv, "pvname") and pv.pvname == pvname:
|
|
return pv
|
|
return None
|
|
|
|
|
|
# Tests for file_age()
|
|
@pytest.mark.parametrize("age_seconds, expected", [
|
|
(30, timedelta(seconds=30)), # 0:00:30
|
|
(300, timedelta(minutes=5)), # 0:05:00
|
|
(3600, timedelta(hours=1)), # 1:00:00
|
|
(86400, timedelta(days=1)), # 1 day
|
|
(1209600, timedelta(days=14)), # 2 weeks
|
|
])
|
|
@freeze_time("2025-08-07 12:00:00")
|
|
def test_file_age(tmp_path, age_seconds, expected):
|
|
test_file = tmp_path / "testfile"
|
|
test_file.touch()
|
|
|
|
past_timestamp = time.time() - age_seconds
|
|
os.utime(test_file, (past_timestamp, past_timestamp))
|
|
|
|
result = file_age(test_file)
|
|
assert result == expected
|
|
|
|
|
|
# Tests for preload()
|
|
def test_preload_valid_file(tmp_path, caplog):
|
|
configure_logzero_for_pytest(caplog)
|
|
|
|
f = tmp_path / "valid.pkl"
|
|
pvname_1 = "TEST:PV1"
|
|
pvname_2 = "TEST:PV2"
|
|
|
|
# Create pickle file with PV names
|
|
with open(f, "wb") as pkl:
|
|
std_pickle.dump([pvname_1, pvname_2], pkl)
|
|
|
|
# Verify PVs are not yet in EPICS cache
|
|
assert not is_pv_in_cache(pvname_1)
|
|
assert not is_pv_in_cache(pvname_2)
|
|
|
|
# Patch the path used in preload()
|
|
with pytest.MonkeyPatch().context() as mp:
|
|
mp.setattr("slic.utils.pvpreload.fn", f)
|
|
|
|
with caplog.at_level("DEBUG"):
|
|
preload()
|
|
|
|
# Verify PVs were created and are in cache
|
|
assert is_pv_in_cache(pvname_1)
|
|
assert is_pv_in_cache(pvname_2)
|
|
|
|
pv_1 = get_pv_from_cache(pvname_1)
|
|
pv_2 = get_pv_from_cache(pvname_2)
|
|
|
|
assert not pv_1.connected
|
|
assert not pv_2.connected
|
|
|
|
# Verify final log
|
|
logs = "\n".join(caplog.messages)
|
|
assert "PV preload done" in logs
|
|
|
|
def test_preload_old_file(tmp_path, caplog):
|
|
configure_logzero_for_pytest(caplog)
|
|
|
|
f = tmp_path / "too_old.pkl"
|
|
pvname = "TEST:PV_X"
|
|
|
|
# Create valid pickle file with PV name
|
|
with open(f, "wb") as pkl:
|
|
std_pickle.dump([pvname], pkl)
|
|
|
|
# Artificially age the file (e.g., 2 hours old)
|
|
old_time = datetime.timestamp(datetime.now() - 2 * lifetime)
|
|
os.utime(f, (old_time, old_time))
|
|
|
|
# Patch the file path in preload
|
|
with pytest.MonkeyPatch().context() as mp:
|
|
mp.setattr("slic.utils.pvpreload.fn", f)
|
|
|
|
with caplog.at_level("INFO"):
|
|
preload()
|
|
|
|
# Verify file was detected as too old
|
|
logs = "\n".join(caplog.messages)
|
|
assert "PV preload file too old" in logs
|
|
|
|
# Verify PV was not created (not in cache)
|
|
assert not is_pv_in_cache(pvname)
|
|
|
|
def test_preload_corrupt_file(tmp_path, caplog):
|
|
configure_logzero_for_pytest(caplog)
|
|
|
|
f = tmp_path / "corrupt.pkl"
|
|
|
|
# Write invalid file (not pickle)
|
|
with open(f, "w") as fcorrupt:
|
|
fcorrupt.write("not a pickle at all")
|
|
|
|
# Patch the file path in preload
|
|
with pytest.MonkeyPatch().context() as mp:
|
|
mp.setattr("slic.utils.pvpreload.fn", f)
|
|
|
|
with caplog.at_level("WARNING"):
|
|
preload()
|
|
|
|
logs = "\n".join(caplog.messages)
|
|
assert "PV preload file not loaded due to: UnpicklingError:" in logs
|
|
|
|
|
|
# Tests for offload()
|
|
def test_offload(tmp_path, caplog):
|
|
configure_logzero_for_pytest(caplog)
|
|
|
|
fake_file = tmp_path / "offload_test.pkl"
|
|
pvname_3 = "TEST:PV3"
|
|
pvname_4 = "TEST:PV4"
|
|
|
|
# Create 2 PVs
|
|
pv_3 = get_pv(pvname_3, connect=True)
|
|
pv_4 = get_pv(pvname_4, connect=True)
|
|
|
|
# Wait for connection
|
|
assert pv_3.wait_for_connection(timeout=2), "PV3 not connected"
|
|
assert pv_4.wait_for_connection(timeout=2), "PV4 not connected"
|
|
|
|
assert pv_3.connected, "PV3 marked as not connected"
|
|
assert pv_4.connected, "PV4 marked as not connected"
|
|
|
|
assert is_pv_in_cache(pvname_3)
|
|
assert is_pv_in_cache(pvname_4)
|
|
|
|
# Patch file and delay
|
|
with pytest.MonkeyPatch().context() as mp:
|
|
mp.setattr("slic.utils.pvpreload.fn", fake_file)
|
|
mp.setattr("slic.utils.pvpreload.delay", 0.01)
|
|
|
|
with caplog.at_level("DEBUG"):
|
|
offload()
|
|
|
|
assert fake_file.exists(), "Offload file not created"
|
|
file_size = fake_file.stat().st_size
|
|
assert file_size > 0, f"File is empty (size: {file_size} bytes)"
|
|
|
|
# Read file content
|
|
with open(fake_file, "rb") as f:
|
|
try:
|
|
names = std_pickle.load(f)
|
|
except Exception as e:
|
|
pytest.fail(f"Failed to unpickle: {e}")
|
|
|
|
# Final checks
|
|
assert isinstance(names, set), "Saved data is not a set"
|
|
assert pvname_3 in names, f"PV3 missing in {names}"
|
|
assert pvname_4 in names, f"PV4 missing in {names}"
|
|
|
|
# Verify logs
|
|
logs = "\n".join(caplog.messages)
|
|
assert "PV offload start" in logs
|
|
assert "PV offload done" in logs
|
|
|
|
def test_offload_exception(tmp_path, caplog):
|
|
configure_logzero_for_pytest(caplog)
|
|
|
|
# Invalid file: directory instead of file
|
|
fake_file = tmp_path / "invalid_dir"
|
|
fake_file.mkdir()
|
|
|
|
pvname = "TEST:PV1"
|
|
|
|
# Create and connect PV to be in cache
|
|
pv = get_pv(pvname, connect=True)
|
|
assert pv.wait_for_connection(timeout=2), "PV not connected"
|
|
assert is_pv_in_cache(pvname)
|
|
|
|
# Patch fn variable to point to wrong "file"
|
|
with pytest.MonkeyPatch().context() as mp:
|
|
mp.setattr("slic.utils.pvpreload.fn", fake_file)
|
|
mp.setattr("slic.utils.pvpreload.delay", 0.01)
|
|
|
|
with caplog.at_level("WARNING"):
|
|
offload()
|
|
|
|
logs = "\n".join(caplog.messages)
|
|
assert "PV preload file not saved due to: IsADirectoryError:" in logs
|
|
|
|
def test_pvpreload_end_to_end(tmp_path, caplog):
|
|
configure_logzero_for_pytest(caplog)
|
|
|
|
preload_file = tmp_path / "end2end.pkl"
|
|
|
|
# Patch preload/offload to use correct file and small delay
|
|
with pytest.MonkeyPatch().context() as mp:
|
|
mp.setattr("slic.utils.pvpreload.fn", preload_file)
|
|
mp.setattr("slic.utils.pvpreload.delay", 0.05)
|
|
|
|
# Start thread once and for all
|
|
with caplog.at_level("DEBUG"):
|
|
pvpreload()
|
|
|
|
# Step 1: Create PV5 and wait for offload
|
|
pvname_5 = "TEST:PV5"
|
|
pv_5 = get_pv(pvname_5, connect=True)
|
|
assert pv_5.wait_for_connection(timeout=2)
|
|
time.sleep(0.2) # Let offload capture it
|
|
|
|
assert preload_file.exists(), "File not created after offload"
|
|
names = std_pickle.load(open(preload_file, "rb"))
|
|
assert pvname_5 in names, f"{pvname_5} missing from file"
|
|
|
|
# Step 2: Inject PV6 into file for next preload
|
|
pvname_6 = "TEST:PV6"
|
|
with open(preload_file, "wb") as f:
|
|
std_pickle.dump([pvname_6], f)
|
|
|
|
assert not is_pv_in_cache(pvname_6)
|
|
|
|
# Wait for next preload to reload PV6
|
|
with caplog.at_level("DEBUG"):
|
|
pvpreload()
|
|
time.sleep(0.2)
|
|
|
|
assert is_pv_in_cache(pvname_6), "PV6 not reloaded by preload"
|
|
|
|
# Verify logs
|
|
logs = "\n".join(caplog.messages)
|
|
assert "PV preload done" in logs
|
|
assert "PV offload done" in logs
|