import patch_put import time import threading import pytest import epics import os from epics.pv import PV from epics.pv import _PVcache_ from slic.utils.hastyepics import * def _in_pv_cache(pvname: str) -> bool: return any(k[0] == pvname for k in _PVcache_.keys()) def _evict_prefix(prefix: str): for k in list(_PVcache_.keys()): if k[0].startswith(prefix): _PVcache_.pop(k, None) @pytest.fixture(scope="session", autouse=True) def clear_pv_cache_at_session_start(): """Clear PyEPICS PV cache for our prefix once at session start.""" prefix = os.environ.get("TEST_PV_PREFIX", "TEST:") _evict_prefix(prefix) yield # ============================ # Tests # ============================ def test_get_pv_connect_false_and_true(): name = "TEST:SIM:VAL" assert not _in_pv_cache(name) pv = get_pv(name, connect=False) assert isinstance(pv, PV) assert _in_pv_cache(name), "PV should be in _PVcache_ after get_pv(connect=False)" assert not pv.connected # Now actually connect pv2 = get_pv(name, connect=True, timeout=2.0) assert pv2.connected def test_motor_init_strips_suffixes(): m1 = Motor("TEST:SIM:M1.VAL") # Afficher les attributs de base de epics.Motor print("Attributes of epics.Motor:") print(" _init_list:", getattr(epics.Motor, "_init_list", [])) print(" _extras:", getattr(epics.Motor, "_extras", {})) # Afficher les attributs de ton objet Motor print("\nAttributes of Motor (custom):") print(" _init_list:", getattr(m1, "_init_list", [])) print(" _extras:", getattr(m1, "_extras", {})) # Afficher tous les attributs de epics.Motor (via dir()) print("\nAll attributes of epics.Motor:") print(dir(epics.Motor)) # Afficher tous les attributs de ta classe Motor (via dir()) print("\nAll attributes of Motor (custom):") print(dir(m1)) assert m1._prefix == "TEST:SIM:M1" m2 = Motor("TEST:SIM:M2.") assert m2._prefix == "TEST:SIM:M2" m3 = Motor("TEST:SIM:M3") assert m3._prefix == "TEST:SIM:M3" def test_motor_invalid_name_raises(): with pytest.raises(MotorException): Motor(None) def test_disabled_removed_relative_to_upstream(): m = Motor("TEST:SIM:M7") init_list = tuple(getattr(m, "_init_list", ())) extras = dict(getattr(m, "_extras", {})) assert len(init_list) > 0, "_init_list is empty; expected at least one field" assert len(extras) > 0, "_extras is empty; expected at least one entry" assert "disabled" not in getattr(m, "_init_list", ()), \ "disabled devait être retiré de _init_list par rapport à epics.Motor" assert "disabled" not in getattr(m, "_extras", {}), \ "disabled devait être retiré de _extras par rapport à epics.Motor" def _get_cached_pv(name: str): for k, pv in _PVcache_.items(): if k[0] == name: return pv return None def test_motor_init_list_and_extras_build_handles_eagerly(): base_init = tuple(getattr(epics.Motor, "_init_list", ())) base_extras = dict(getattr(epics.Motor, "_extras", {})) base_init_wo_disabled = tuple(x for x in base_init if x != "disabled") base_extras_wo_disabled = {k: v for k, v in base_extras.items() if k != "disabled"} motor_prefix = "TEST:SIM:M4" _evict_prefix(motor_prefix) init_pvnames = [f"{motor_prefix}.{attr}" for attr in base_init_wo_disabled] extra_pvnames = [f"{motor_prefix}{suffix}" for suffix in base_extras_wo_disabled.values()] for name in init_pvnames + extra_pvnames: assert not _in_pv_cache(name), f"{name} should not be in cache before Motor()" m = Motor(motor_prefix) # init_list: handles exist in cache and are disconnected for name in init_pvnames: assert _in_pv_cache(name), f"{name} not inserted into _PVcache_ by __init__" cached = _get_cached_pv(name) assert cached is not None and not cached.connected # handles exist in cache for attr_name, suffix in base_extras_wo_disabled.items(): name = f"{motor_prefix}{suffix}" assert _in_pv_cache(name), f"{name} not inserted into _PVcache_ by extras loop" cached = _get_cached_pv(name) assert cached is not None and not cached.connected def test_motor_val_connect_via_cache(): motor_prefix = "TEST:SIM:M5" pvname = f"{motor_prefix}.VAL" assert not _in_pv_cache(pvname) m = Motor(motor_prefix) assert _in_pv_cache(pvname), f"{pvname} should be cached after Motor()" pv = _get_cached_pv(pvname) assert pv is not None assert not pv.connected assert pv.wait_for_connection(timeout=2.0) pv.put(123.0, wait=True) assert pv.get(timeout=1.0) == 123.0 def test_speedup_get_pv(): """Ton get_pv délègue à epics.get_pv : on vérifie juste qu'il n'est pas nettement plus lent.""" t0 = time.perf_counter() get_pv("TEST:SIM:M13.VAL", connect=False) t1 = time.perf_counter() t2 = time.perf_counter() epics.get_pv("TEST:SIM:M13.VAL", connect=True) t3 = time.perf_counter() fast_time = t1 - t0 slow_time = t3 - t2 assert fast_time <= slow_time, ( f"get_pv ({fast_time:.6f}s) plus lent que EPICS ({slow_time:.6f}s)" ) def test_speedup_motor_instantiation(): """On exige juste pas de grosse régression vs EPICS.""" t0 = time.perf_counter() Motor("TEST:SIM:M6") t1 = time.perf_counter() t2 = time.perf_counter() epics.Motor("TEST:SIM:M8") t3 = time.perf_counter() fast_time = t1 - t0 slow_time = t3 - t2 assert fast_time <= slow_time, ( f"Motor custom ({fast_time:.6f}s) plus lent que EPICS.Motor ({slow_time:.6f}s)" ) def test_speedup_motor_PV(): # baseline EPICS m_slow = Motor("TEST:SIM:M14") t0 = time.perf_counter() pv_slow = m_slow.PV("VAL", connect=True) t1 = time.perf_counter() m_fast = Motor("TEST:SIM:M15") t2 = time.perf_counter() pv_fast = m_fast.PV("VAL", connect=False) t3 = time.perf_counter() assert pv_slow.connected assert not pv_fast.connected slow_time = t1 - t0 fast_time = t3 - t2 assert fast_time <= slow_time, ( f"Motor.PV custom ({fast_time:.6f}s) beaucoup plus lent que ({slow_time:.6f}s)" ) def test_motor_init_list_attrs_created(): base_init = tuple(getattr(epics.Motor, "_init_list", ())) base_init_wo_disabled = tuple(x for x in base_init if x != "disabled") m = Motor("TEST:SIM:M10") for attr in base_init_wo_disabled: pv = m.PV(attr, connect=False) assert isinstance(pv, PV), f"{attr} n'est pas un PV" # Pas d'hypothèse forte sur l'état de connexion ici def test_motor_extras_attrs_correct(): base_extras = dict(getattr(epics.Motor, "_extras", {})) base_extras_wo_disabled = {k: v for k, v in base_extras.items() if k != "disabled"} m = Motor("TEST:SIM:M11") for key, suffix in base_extras_wo_disabled.items(): if hasattr(m, key): pv = getattr(m, key) assert isinstance(pv, PV) assert pv.pvname == f"TEST:SIM:M11{suffix}" def test_motor_callbacks_empty(): m = Motor("TEST:SIM:M12") assert isinstance(m._callbacks, dict) assert m._callbacks == {}