Update tests/ioc_plugin.py
Run CI Tests / test (push) Successful in 1m42s

This commit is contained in:
2025-08-11 01:59:36 +02:00
parent acde95b395
commit c32f2bf152
+102 -57
View File
@@ -1,77 +1,122 @@
# ioc_plugin.py
import os, time, threading, pytest
from morbidissimo import MorIOC
import epics # to read Motor field lists
import os, sys, time, threading, pytest, importlib, pkgutil
import epics
PREFIX_ENV = "TEST_PV_PREFIX"
DEFAULT_PREFIX = "TEST:"
MOTOR_IDS = [f"M{i}" for i in range(1, 14)]
# ---- Robust MorIOC loader (handles namespace packages)
def _load_morioc():
try:
from morbidissimo import MorIOC # fast path
return MorIOC
except Exception:
import morbidissimo
for _, modname, _ in pkgutil.walk_packages(
morbidissimo.__path__, morbidissimo.__name__ + "."
):
try:
mod = importlib.import_module(modname)
if hasattr(mod, "MorIOC"):
return getattr(mod, "MorIOC")
except Exception:
continue
raise ImportError("Could not find MorIOC inside morbidissimo.*")
RealMorIOC = _load_morioc()
def _motor_suffixes():
"""Collect the field suffixes epics.Motor expects; fallback if missing."""
init_list = tuple(getattr(epics.Motor, "_init_list", ()))
extras = dict(getattr(epics.Motor, "_extras", {}))
suffixes = set(init_list) | {s.lstrip(".") for s in extras.values()}
if not suffixes:
suffixes = {"VAL","RBV","DMOV","DESC","EGU","DIR","HLM","LLM","VELO","STOP"}
return sorted(suffixes)
suf = set(init_list) | {s.lstrip(".") for s in extras.values()}
if not suf:
suf = {"VAL","RBV","DMOV","DESC","EGU","DIR","HLM","LLM","VELO","STOP"}
return sorted(suf)
@pytest.fixture(scope="session", autouse=True)
def ioc_session():
"""
ONE IOC for the whole test session:
- TEST:VAL and TEST:PV1..PV6 (constants)
- TEST:SIM:VAL
- TEST:SIM:M*.<all motor fields> for M1..M13
"""
prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX)
os.environ.setdefault(PREFIX_ENV, prefix)
_shared = {"mor": None, "stop": threading.Event(), "thread": None, "hosted": set()}
motor_suffixes = _motor_suffixes()
stop = threading.Event()
def _start_shared_ioc(prefix: str):
mor = RealMorIOC(prefix)
mor.__enter__() # start server
def run():
with MorIOC(prefix) as mor:
# Legacy PVs + SIM:VAL
mor.host(
VAL=float,
PV1=float, PV2=float, PV3=float, PV4=float, PV5=float, PV6=float,
**{"SIM:VAL": float},
)
# Host legacy + SIM basic + all motor fields
to_host = {
"VAL": float,
"PV1": float, "PV2": float, "PV3": float,
"PV4": float, "PV5": float, "PV6": float,
"SIM:VAL": float,
}
for mid in MOTOR_IDS:
for suf in _motor_suffixes():
to_host[f"SIM:{mid}.{suf}"] = float
mor.host(**to_host)
_shared["hosted"].update(to_host.keys())
# All motor fields for each motor
motor_fields = {f"SIM:{mid}.{suf}": float
for mid in MOTOR_IDS for suf in motor_suffixes}
mor.host(**motor_fields)
# Backing state
state = {k: 0.0 for k in to_host.keys()}
for mid in MOTOR_IDS:
state[f"SIM:{mid}.DMOV"] = 1.0 # “done”
# Backing state (writable values we keep/serve)
state = {"VAL": 0.0, "SIM:VAL": 0.0}
for k in motor_fields:
state[k] = 0.0
def loop():
while not _shared["stop"].is_set():
# accept writes
for name in state.keys():
v = mor.get(name)
if v is not None:
state[name] = v
# RBV follows VAL
for mid in MOTOR_IDS:
state[f"SIM:{mid}.DMOV"] = 1.0 # motors 'done' by default
state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"]
# serve (+ static PV1..PV6)
mor.serve(
**state,
PV1=1.0, PV2=2.0, PV3=3.0,
PV4=4.0, PV5=5.0, PV6=6.0,
)
time.sleep(0.05)
while not stop.is_set():
# Accept writes
for name in state.keys():
v = mor.get(name)
if v is not None:
state[name] = v
# Make RBV follow VAL for each motor
for mid in MOTOR_IDS:
state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"]
# Serve current values (+ static PV1..PV6)
mor.serve(**state, PV1=1.0, PV2=2.0, PV3=3.0, PV4=4.0, PV5=5.0, PV6=6.0)
time.sleep(0.05)
t = threading.Thread(target=run, daemon=True)
t = threading.Thread(target=loop, daemon=True)
t.start()
time.sleep(0.8) # let CA announce
_shared.update(mor=mor, thread=t)
yield
class _MorIOCProxy:
"""Any MorIOC(...) in tests reuses the shared IOC and avoids re-hosting."""
def __init__(self, *_, **__): pass
def __enter__(self): return self
def __exit__(self, *args): return False
def __getattr__(self, name): return getattr(_shared["mor"], name)
def host(self, **pvs):
new = {k: v for k, v in pvs.items() if k not in _shared["hosted"]}
if new:
_shared["mor"].host(**new)
_shared["hosted"].update(new.keys())
stop.set()
t.join(timeout=2.0)
def pytest_configure(config):
prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX)
if not prefix.endswith(":"):
prefix += ":"
os.environ[PREFIX_ENV] = prefix
# Start shared IOC BEFORE tests and patch MorIOC globally
_start_shared_ioc(prefix)
import morbidissimo
morbidissimo.MorIOC = _MorIOCProxy
sys.modules["morbidissimo"].MorIOC = _MorIOCProxy
# small CA settle time
print(f"[ioc_plugin] Shared IOC up with prefix '{prefix}'")
time.sleep(0.5)
def pytest_sessionfinish(session, exitstatus):
_shared["stop"].set()
t = _shared.get("thread")
if t:
t.join(timeout=2.0)
mor = _shared.get("mor")
if mor:
try:
mor.__exit__(None, None, None)
except Exception:
pass
time.sleep(0.2)