import os import sys import time import threading import importlib import pkgutil import pytest import epics from epics.pv import _PVcache_ PREFIX_ENV = "TEST_PV_PREFIX" DEFAULT_PREFIX = "TEST:" MOTOR_IDS = [f"M{i}" for i in range(1, 16)] # M1..M15 STR_SUFFIXES = {"RTYP", "DESC", "EGU", "DIR"} # serve strings for these fields # -------- Robust MorIOC loader (handles namespace packages) ------------------ def _load_morioc(): try: from morbidissimo import MorIOC # fast path return MorIOC except Exception: import morbidissimo for _, modname, _ in pkgutil.walk_packages( morbidissimo.__path__, morbidissimo.__name__ + "." ): try: mod = importlib.import_module(modname) if hasattr(mod, "MorIOC"): return getattr(mod, "MorIOC") except Exception: continue raise ImportError("Could not find MorIOC inside morbidissimo.*") RealMorIOC = _load_morioc() def _motor_suffixes(): """Collect expected motor field suffixes from epics.Motor (init_list + extras).""" init_list = tuple(getattr(epics.Motor, "_init_list", ())) extras = dict(getattr(epics.Motor, "_extras", {})) suf = set(init_list) | {s.lstrip(".") for s in extras.values()} if not suf: # Minimal fallback if upstream metadata is absent suf = {"VAL", "RBV", "DMOV", "DESC", "EGU", "DIR", "HLM", "LLM", "VELO", "STOP"} # Ensure these critical fields are present suf.update({"VAL", "RBV", "DMOV"}) return sorted(suf) _shared = {"mor": None, "stop": threading.Event(), "thread": None, "hosted": set()} def _start_shared_ioc(prefix: str): """Start one IOC, host all PVs once, seed initial values, and run a small serve loop.""" mor = RealMorIOC(prefix) mor.__enter__() # start server # Host legacy + SIM basics to_host = { "VAL": float, "PV1": float, "PV2": float, "PV3": float, "PV4": float, "PV5": float, "PV6": float, "SIM:VAL": float, } to_host.update({ "SIN-TIMAST-TMA:Bunch-1-Exp-Freq-RB": float, # PV for aramis "SIN-TIMAST-TMA:Bunch-2-Exp-Freq-RB": float, # PV for athos }) # Host all motor fields with appropriate types suffixes = _motor_suffixes() for mid in MOTOR_IDS: for suf in suffixes: typ = str if suf in STR_SUFFIXES else float to_host[f"SIM:{mid}.{suf}"] = typ # Ensure RTYP exists even if not present in suffixes to_host[f"SIM:{mid}.RTYP"] = str mor.host(**to_host) _shared["hosted"].update(to_host.keys()) # Backing state with correct initial values state = {} state.update({ "SIN-TIMAST-TMA:Bunch-1-Exp-Freq-RB": 20.0, "SIN-TIMAST-TMA:Bunch-2-Exp-Freq-RB": 20.0, }) units = { "SIN-TIMAST-TMA:Bunch-1-Exp-Freq-RB": "Hz", "SIN-TIMAST-TMA:Bunch-2-Exp-Freq-RB": "Hz", } for pv, unit in units.items(): state[f"{pv}_units"] = unit for name, typ in to_host.items(): if name.endswith(".RTYP"): state[name] = "motor" elif name.endswith(".DESC"): state[name] = "" elif name.endswith(".EGU"): state[name] = "mm" elif name.endswith(".DIR"): state[name] = "Pos" # string is fine for DIR here elif name in {"PV1", "PV2", "PV3", "PV4", "PV5", "PV6"}: base = {"PV1": 1.0, "PV2": 2.0, "PV3": 3.0, "PV4": 4.0, "PV5": 5.0, "PV6": 6.0} state[name] = base[name] else: state[name] = 0.0 # DMOV=1 and RBV initial == VAL for all motors for mid in MOTOR_IDS: state[f"SIM:{mid}.DMOV"] = 1.0 state[f"SIM:{mid}.RBV"] = state.get(f"SIM:{mid}.VAL", 0.0) # Seed once so early epics.Motor get('RTYP') reads "motor" immediately mor.serve(**state) def loop(): while not _shared["stop"].is_set(): # Accept client writes (make writable PVs actually stick) for name in list(state.keys()): v = mor.get(name) if v is not None: state[name] = v # RBV follows VAL for mid in MOTOR_IDS: state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"] # Reassert constants state["PV1"] = 1.0; state["PV2"] = 2.0; state["PV3"] = 3.0 state["PV4"] = 4.0; state["PV5"] = 5.0; state["PV6"] = 6.0 mor.serve(**state) time.sleep(0.05) t = threading.Thread(target=loop, daemon=True) t.start() _shared.update(mor=mor, thread=t) class _MorIOCProxy: """ Context-manager drop-in that always reuses the shared IOC, and guards .host() to avoid duplicate host calls if tests try to re-host. """ def __init__(self, *_, **__): pass def __enter__(self): return self def __exit__(self, *args): return False def __getattr__(self, name): return getattr(_shared["mor"], name) def host(self, **pvs): new = {k: v for k, v in pvs.items() if k not in _shared["hosted"]} if new: _shared["mor"].host(**new) _shared["hosted"].update(new.keys()) @pytest.fixture(scope="session") def start_ioc(): """Start the IOC before each test, and clean up after.""" prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX) if not prefix.endswith(":"): prefix += ":" os.environ[PREFIX_ENV] = prefix _start_shared_ioc(prefix) time.sleep(0.5) # small CA settle time yield # This is where the actual test runs # Cleanup after the test _shared["stop"].set() t = _shared.get("thread") if t: t.join(timeout=2.0) mor = _shared.get("mor") if mor: try: mor.__exit__(None, None, None) except Exception: pass time.sleep(0.2) def pytest_configure(config): """Configure pytest to start the IOC and make sure the MorIOC is patched.""" prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX) if not prefix.endswith(":"): prefix += ":" os.environ[PREFIX_ENV] = prefix _start_shared_ioc(prefix) time.sleep(0.5) # small CA settle time def pytest_sessionfinish(session, exitstatus): """Stop the IOC and cleanup after all tests.""" _shared["stop"].set() t = _shared.get("thread") if t: t.join(timeout=2.0) mor = _shared.get("mor") if mor: try: mor.__exit__(None, None, None) except Exception: pass time.sleep(0.2)