diff --git a/tests/ioc_plugin.py b/tests/ioc_plugin.py index 181f54276..26b89fa85 100644 --- a/tests/ioc_plugin.py +++ b/tests/ioc_plugin.py @@ -1,77 +1,122 @@ # ioc_plugin.py -import os, time, threading, pytest -from morbidissimo import MorIOC -import epics # to read Motor field lists +import os, sys, time, threading, pytest, importlib, pkgutil +import epics PREFIX_ENV = "TEST_PV_PREFIX" DEFAULT_PREFIX = "TEST:" MOTOR_IDS = [f"M{i}" for i in range(1, 14)] +# ---- Robust MorIOC loader (handles namespace packages) +def _load_morioc(): + try: + from morbidissimo import MorIOC # fast path + return MorIOC + except Exception: + import morbidissimo + for _, modname, _ in pkgutil.walk_packages( + morbidissimo.__path__, morbidissimo.__name__ + "." + ): + try: + mod = importlib.import_module(modname) + if hasattr(mod, "MorIOC"): + return getattr(mod, "MorIOC") + except Exception: + continue + raise ImportError("Could not find MorIOC inside morbidissimo.*") + +RealMorIOC = _load_morioc() + def _motor_suffixes(): - """Collect the field suffixes epics.Motor expects; fallback if missing.""" init_list = tuple(getattr(epics.Motor, "_init_list", ())) extras = dict(getattr(epics.Motor, "_extras", {})) - suffixes = set(init_list) | {s.lstrip(".") for s in extras.values()} - if not suffixes: - suffixes = {"VAL","RBV","DMOV","DESC","EGU","DIR","HLM","LLM","VELO","STOP"} - return sorted(suffixes) + suf = set(init_list) | {s.lstrip(".") for s in extras.values()} + if not suf: + suf = {"VAL","RBV","DMOV","DESC","EGU","DIR","HLM","LLM","VELO","STOP"} + return sorted(suf) -@pytest.fixture(scope="session", autouse=True) -def ioc_session(): - """ - ONE IOC for the whole test session: - - TEST:VAL and TEST:PV1..PV6 (constants) - - TEST:SIM:VAL - - TEST:SIM:M*. for M1..M13 - """ - prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX) - os.environ.setdefault(PREFIX_ENV, prefix) +_shared = {"mor": None, "stop": threading.Event(), "thread": None, "hosted": set()} - motor_suffixes = _motor_suffixes() - stop = threading.Event() +def _start_shared_ioc(prefix: str): + mor = RealMorIOC(prefix) + mor.__enter__() # start server - def run(): - with MorIOC(prefix) as mor: - # Legacy PVs + SIM:VAL - mor.host( - VAL=float, - PV1=float, PV2=float, PV3=float, PV4=float, PV5=float, PV6=float, - **{"SIM:VAL": float}, - ) + # Host legacy + SIM basic + all motor fields + to_host = { + "VAL": float, + "PV1": float, "PV2": float, "PV3": float, + "PV4": float, "PV5": float, "PV6": float, + "SIM:VAL": float, + } + for mid in MOTOR_IDS: + for suf in _motor_suffixes(): + to_host[f"SIM:{mid}.{suf}"] = float + mor.host(**to_host) + _shared["hosted"].update(to_host.keys()) - # All motor fields for each motor - motor_fields = {f"SIM:{mid}.{suf}": float - for mid in MOTOR_IDS for suf in motor_suffixes} - mor.host(**motor_fields) + # Backing state + state = {k: 0.0 for k in to_host.keys()} + for mid in MOTOR_IDS: + state[f"SIM:{mid}.DMOV"] = 1.0 # “done” - # Backing state (writable values we keep/serve) - state = {"VAL": 0.0, "SIM:VAL": 0.0} - for k in motor_fields: - state[k] = 0.0 + def loop(): + while not _shared["stop"].is_set(): + # accept writes + for name in state.keys(): + v = mor.get(name) + if v is not None: + state[name] = v + # RBV follows VAL for mid in MOTOR_IDS: - state[f"SIM:{mid}.DMOV"] = 1.0 # motors 'done' by default + state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"] + # serve (+ static PV1..PV6) + mor.serve( + **state, + PV1=1.0, PV2=2.0, PV3=3.0, + PV4=4.0, PV5=5.0, PV6=6.0, + ) + time.sleep(0.05) - while not stop.is_set(): - # Accept writes - for name in state.keys(): - v = mor.get(name) - if v is not None: - state[name] = v - - # Make RBV follow VAL for each motor - for mid in MOTOR_IDS: - state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"] - - # Serve current values (+ static PV1..PV6) - mor.serve(**state, PV1=1.0, PV2=2.0, PV3=3.0, PV4=4.0, PV5=5.0, PV6=6.0) - time.sleep(0.05) - - t = threading.Thread(target=run, daemon=True) + t = threading.Thread(target=loop, daemon=True) t.start() - time.sleep(0.8) # let CA announce + _shared.update(mor=mor, thread=t) - yield +class _MorIOCProxy: + """Any MorIOC(...) in tests reuses the shared IOC and avoids re-hosting.""" + def __init__(self, *_, **__): pass + def __enter__(self): return self + def __exit__(self, *args): return False + def __getattr__(self, name): return getattr(_shared["mor"], name) + def host(self, **pvs): + new = {k: v for k, v in pvs.items() if k not in _shared["hosted"]} + if new: + _shared["mor"].host(**new) + _shared["hosted"].update(new.keys()) - stop.set() - t.join(timeout=2.0) +def pytest_configure(config): + prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX) + if not prefix.endswith(":"): + prefix += ":" + os.environ[PREFIX_ENV] = prefix + + # Start shared IOC BEFORE tests and patch MorIOC globally + _start_shared_ioc(prefix) + import morbidissimo + morbidissimo.MorIOC = _MorIOCProxy + sys.modules["morbidissimo"].MorIOC = _MorIOCProxy + + # small CA settle time + print(f"[ioc_plugin] Shared IOC up with prefix '{prefix}'") + time.sleep(0.5) + +def pytest_sessionfinish(session, exitstatus): + _shared["stop"].set() + t = _shared.get("thread") + if t: + t.join(timeout=2.0) + mor = _shared.get("mor") + if mor: + try: + mor.__exit__(None, None, None) + except Exception: + pass time.sleep(0.2)