This commit is contained in:
@@ -9,6 +9,18 @@ from slic.utils.hastyepics import *
|
||||
def _in_pv_cache(pvname: str) -> bool:
|
||||
return any(k[0] == pvname for k in _PVcache_.keys())
|
||||
|
||||
def _evict_prefix(prefix: str):
|
||||
for k in list(_PVcache_.keys()):
|
||||
if k[0].startswith(prefix):
|
||||
_PVcache_.pop(k, None)
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def clear_pv_cache_at_session_start():
|
||||
"""Clear PyEPICS PV cache for our prefix once at session start."""
|
||||
prefix = os.environ.get("TEST_PV_PREFIX", "TEST:")
|
||||
_evict_prefix(prefix)
|
||||
yield
|
||||
|
||||
# ============================
|
||||
# Tests
|
||||
# ============================
|
||||
@@ -49,53 +61,70 @@ def test_disabled_removed_relative_to_upstream():
|
||||
base_init = tuple(getattr(epics.Motor, "_init_list", ()))
|
||||
base_extras = dict(getattr(epics.Motor, "_extras", {}))
|
||||
|
||||
had_in_init = "disabled" in base_init
|
||||
had_in_extras = "disabled" in base_extras
|
||||
assert "disabled" in base_init
|
||||
assert "disabled" in base_extras
|
||||
|
||||
m = Motor("TEST:SIM:M7")
|
||||
|
||||
if had_in_init:
|
||||
assert "disabled" not in getattr(m, "_init_list", ()), \
|
||||
assert "disabled" not in getattr(m, "_init_list", ()), \
|
||||
"disabled devait être retiré de _init_list par rapport à epics.Motor"
|
||||
|
||||
if had_in_extras:
|
||||
assert "disabled" not in getattr(m, "_extras", {}), \
|
||||
assert "disabled" not in getattr(m, "_extras", {}), \
|
||||
"disabled devait être retiré de _extras par rapport à epics.Motor"
|
||||
|
||||
def _get_cached(name: str):
|
||||
for k, pv in _PVcache_.items():
|
||||
if k[0] == name:
|
||||
return pv
|
||||
return None
|
||||
|
||||
def test_motor_init_list_and_extras():
|
||||
"""
|
||||
Vérifie la cohérence de _init_list/_extras vs l'upstream :
|
||||
- tout ce que l'upstream expose (hors 'disabled') peut être résolu via PV()/attribut.
|
||||
Sans forcer que _extras soit non vide si l'upstream ne l'est pas.
|
||||
"""
|
||||
def test_motor_init_list_and_extras_build_handles_eagerly():
|
||||
base_init = tuple(getattr(epics.Motor, "_init_list", ()))
|
||||
base_extras = dict(getattr(epics.Motor, "_extras", {}))
|
||||
base_init_wo_disabled = tuple(x for x in base_init if x != "disabled")
|
||||
base_extras_wo_disabled = {k: v for k, v in base_extras.items() if k != "disabled"}
|
||||
|
||||
m = Motor("TEST:SIM:M4")
|
||||
motor_prefix = "TEST:SIM:M4"
|
||||
|
||||
# Pour tous les attributs "init_list" upstream (sans 'disabled'), on doit avoir un PV accessible
|
||||
for attr in base_init_wo_disabled:
|
||||
pv = m.PV(attr, connect=False)
|
||||
assert isinstance(pv, PV), f"{attr} n'est pas un PV"
|
||||
_evict_prefix(motor_prefix)
|
||||
init_pvnames = [f"{motor_prefix}.{attr}" for attr in base_init_wo_disabled]
|
||||
extra_pvnames = [f"{motor_prefix}{suffix}" for suffix in base_extras_wo_disabled.values()]
|
||||
for name in init_pvnames + extra_pvnames:
|
||||
assert not _in_pv_cache(name), f"{name} should not be in cache before Motor()"
|
||||
|
||||
# Pour tous les extras upstream (sans 'disabled'), si l'attribut existe sur m, le pvname doit matcher
|
||||
m = Motor(motor_prefix)
|
||||
|
||||
# init_list: handles exist in cache and are disconnected
|
||||
for name in init_pvnames:
|
||||
assert _in_pv_cache(name), f"{name} not inserted into _PVcache_ by __init__"
|
||||
cached = _get_cached_pv(name)
|
||||
assert cached is not None and not cached.connected
|
||||
|
||||
# handles exist in cache
|
||||
for attr_name, suffix in base_extras_wo_disabled.items():
|
||||
if hasattr(m, attr_name):
|
||||
pv = getattr(m, attr_name)
|
||||
assert isinstance(pv, PV), f"{attr_name} n'est pas un PV"
|
||||
assert pv.pvname == f"TEST:SIM:M4{suffix}"
|
||||
name = f"{motor_prefix}{suffix}"
|
||||
assert _in_pv_cache(name), f"{name} not inserted into _PVcache_ by extras loop"
|
||||
cached = _get_cached_pv(name)
|
||||
assert cached is not None and not cached.connected
|
||||
|
||||
|
||||
def test_motor_pv_method_connects_and_reads():
|
||||
m = Motor("TEST:SIM:M5")
|
||||
pv_val = m.PV("VAL", connect=True, timeout=3.0)
|
||||
assert pv_val.connected
|
||||
pv_val.put(123.0)
|
||||
time.sleep(0.1)
|
||||
assert pv_val.get() == 123.0
|
||||
def test_motor_val_connect_via_cache():
|
||||
motor_prefix = "TEST:SIM:M5"
|
||||
pvname = f"{motor_prefix}.VAL"
|
||||
|
||||
assert not _in_pv_cache(pvname)
|
||||
|
||||
m = Motor(motor_prefix)
|
||||
|
||||
assert _in_pv_cache(pvname), f"{pvname} should be cached after Motor()"
|
||||
|
||||
pv = _get_cached_pv(pvname)
|
||||
assert pv is not None
|
||||
assert not pv.connected
|
||||
|
||||
assert pv.connect(timeout=2.0), "VAL did not connect"
|
||||
pv.put(123.0, wait=True)
|
||||
assert pv.get(timeout=1.0) == 123.0
|
||||
|
||||
|
||||
def test_speedup_get_pv():
|
||||
@@ -140,15 +169,14 @@ def test_speedup_motor_instantiation():
|
||||
|
||||
def test_speedup_motor_PV():
|
||||
# baseline EPICS
|
||||
m_epics = epics.Motor("TEST:SIM:M4")
|
||||
m_epics = epics.Motor("TEST:SIM:M14")
|
||||
t0 = time.perf_counter()
|
||||
pv_epics = m_epics.PV("VAL", connect=True, timeout=2.0)
|
||||
t1 = time.perf_counter()
|
||||
|
||||
# version "fast" (ton Motor)
|
||||
m_fast = Motor("TEST:SIM:M5")
|
||||
m_fast = Motor("TEST:SIM:M15")
|
||||
t2 = time.perf_counter()
|
||||
pv_fast = m_fast.PV("VAL", connect=False) # volontairement non connecté
|
||||
pv_fast = m_fast.PV("VAL", connect=False)
|
||||
t3 = time.perf_counter()
|
||||
|
||||
assert pv_epics.connected
|
||||
|
||||
Reference in New Issue
Block a user