Files
slic/tests/test_utils_pvpreload.py
T
tligui_y c100b138f5
Run CI Tests / test (push) Successful in 1m7s
Update tests/test_utils_pvpreload.py
2025-08-07 22:18:53 +02:00

199 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pytest
import time
import threading
import pickle as std_pickle
from pathlib import Path
import logging
from logzero import logger, setup_default_logger
from slic.utils.picklio import unpickle
from freezegun import freeze_time
from datetime import datetime, timedelta
from slic.utils.hastyepics import get_pv
from epics.pv import _PVcache_
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from slic.utils.pvpreload import *
from morbidissimo import MorIOC
# IOC simulation
@pytest.fixture(scope="module")
def epics_ioc():
from epics import ca
pvname = "TEST:PV1"
def ioc_thread():
value = 0.0
while True:
ca.put(pvname, value)
value += 1.0
time.sleep(0.1)
thread = threading.Thread(target=ioc_thread, daemon=True)
thread.start()
time.sleep(1)
def configure_logzero_for_pytest(caplog):
logger.handlers.clear()
logger.propagate = True # Propager vers le root logger capturé par caplog
caplog.set_level(level = logging.DEBUG)
def is_pv_in_cache(name):
return any(k[0] == name for k in epics.pv._PVcache_)
def get_pv_from_cache(pvname):
for k, pv in _PVcache_.items():
if hasattr(pv, "pvname") and pv.pvname == pvname:
return pv
return None
# Tests
# file_age()
@pytest.mark.parametrize("age_seconds, expected", [
(30, timedelta(seconds=30)), # 0:00:30
(300, timedelta(minutes=5)), # 0:05:00
(3600, timedelta(hours=1)), # 1:00:00
(86400, timedelta(days=1)), # 1 day
(1209600, timedelta(days=14)), # 2 weeks
])
@freeze_time("2025-08-07 12:00:00")
def test_file_age(tmp_path, age_seconds, expected):
test_file = tmp_path / "testfile"
test_file.touch()
past_timestamp = time.time() - age_seconds
os.utime(test_file, (past_timestamp, past_timestamp))
result = file_age(test_file)
assert result == expected
# preload()
def test_preload_fichier_valide(tmp_path, caplog):
configure_logzero_for_pytest(caplog)
f = tmp_path / "valide.pkl"
pvname_1 = "TEST:PV1"
pvname_2 = "TEST:PV2"
# Crée un fichier pickle avec les noms de PV
with open(f, "wb") as pkl:
std_pickle.dump([pvname_1, pvname_2], pkl)
# Vérifie que les PV ne sont pas encore dans le cache EPICS
assert not is_pv_in_cache(pvname_1)
assert not is_pv_in_cache(pvname_2)
# Patch le chemin utilisé dans preload()
with pytest.MonkeyPatch().context() as mp:
mp.setattr("slic.utils.pvpreload.fn", f)
with caplog.at_level("DEBUG"):
preload()
# Vérifie que les PV ont bien été créés et sont dans le cache
assert is_pv_in_cache(pvname_1)
assert is_pv_in_cache(pvname_2)
pv_1 = get_pv_from_cache(pvname_1)
pv_2 = get_pv_from_cache(pvname_2)
assert not pv_1.connected
assert not pv_2.connected
# Vérifie le log final
logs = "\n".join(caplog.messages)
assert "PV preload done" in logs
def test_preload_old_file(tmp_path, caplog):
configure_logzero_for_pytest(caplog)
f = tmp_path / "too_old.pkl"
pvname = "TEST:PV_X"
# Crée un fichier pickle valide avec un nom de PV
with open(f, "wb") as pkl:
std_pickle.dump([pvname], pkl)
# Vieillit artificiellement le fichier (par exemple 2h d'ancienneté)
old_time = datetime.timestamp(datetime.now() - 2 * lifetime)
os.utime(f, (old_time, old_time))
# Patch le chemin du fichier dans preload
with pytest.MonkeyPatch().context() as mp:
mp.setattr("slic.utils.pvpreload.fn", f)
with caplog.at_level("INFO"):
preload()
# Vérifie que le fichier a été détecté comme trop vieux
logs = "\n".join(caplog.messages)
assert "PV preload file too old" in logs
# Vérifie que le PV na pas été créé (pas dans le cache)
assert not is_pv_in_cache(pvname)
def test_preload_fichier_corrompu(tmp_path, caplog):
configure_logzero_for_pytest(caplog)
f = tmp_path / "corrupt.pkl"
# Écrit un fichier non valide (pas du pickle)
with open(f, "w") as fcorrupt:
fcorrupt.write("not a pickle at all")
# Patch le chemin du fichier dans preload
with pytest.MonkeyPatch().context() as mp:
mp.setattr("slic.utils.pvpreload.fn", f)
with caplog.at_level("WARNING"):
preload()
logs = "\n".join(caplog.messages)
assert "PV preload file not loaded due to: UnpicklingError:" in logs
# offload()
def test_offload(tmp_path, caplog):
configure_logzero_for_pytest(caplog)
fake_file = tmp_path / "offload_test.pkl"
pvname_3 = "TEST:PV3"
pvname_4 = "TEST:PV4"
# Create 2 PVs
pv_3 = get_pv(pvname_3, connect = True)
pv_4 = get_pv(pvname_4, connect = True)
pv_3.wait_for_connection(timeout=2)
pv_4.wait_for_connection(timeout=2)
# Patch le fichier et le delay
with pytest.MonkeyPatch().context() as mp:
mp.setattr("slic.utils.pvpreload.fn", fake_file)
mp.setattr("slic.utils.pvpreload.delay", 0.01)
with caplog.at_level("DEBUG"):
offload()
# Vérifie que le fichier contient bien les bons noms
names = std_pickle.load(open(fake_file, "rb"))
assert pvname_3 in names
assert pvname_4 in names
# Vérifie les logs
logs = "\n".join(caplog.messages)
assert "PV offload start" in logs
assert "PV offload done" in logs