212 lines
6.4 KiB
Python
212 lines
6.4 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
import threading
|
|
import importlib
|
|
import pkgutil
|
|
import pytest
|
|
import epics
|
|
from epics.pv import _PVcache_
|
|
|
|
PREFIX_ENV = "TEST_PV_PREFIX"
|
|
DEFAULT_PREFIX = "TEST:"
|
|
MOTOR_IDS = [f"M{i}" for i in range(1, 16)] # M1..M15
|
|
STR_SUFFIXES = {"RTYP", "DESC", "EGU", "DIR"} # serve strings for these fields
|
|
|
|
# -------- Robust MorIOC loader (handles namespace packages) ------------------
|
|
def _load_morioc():
|
|
try:
|
|
from morbidissimo import MorIOC # fast path
|
|
return MorIOC
|
|
except Exception:
|
|
import morbidissimo
|
|
for _, modname, _ in pkgutil.walk_packages(
|
|
morbidissimo.__path__, morbidissimo.__name__ + "."
|
|
):
|
|
try:
|
|
mod = importlib.import_module(modname)
|
|
if hasattr(mod, "MorIOC"):
|
|
return getattr(mod, "MorIOC")
|
|
except Exception:
|
|
continue
|
|
raise ImportError("Could not find MorIOC inside morbidissimo.*")
|
|
|
|
|
|
RealMorIOC = _load_morioc()
|
|
|
|
|
|
def _motor_suffixes():
|
|
"""Collect expected motor field suffixes from epics.Motor (init_list + extras)."""
|
|
init_list = tuple(getattr(epics.Motor, "_init_list", ()))
|
|
extras = dict(getattr(epics.Motor, "_extras", {}))
|
|
suf = set(init_list) | {s.lstrip(".") for s in extras.values()}
|
|
if not suf:
|
|
# Minimal fallback if upstream metadata is absent
|
|
suf = {"VAL", "RBV", "DMOV", "DESC", "EGU", "DIR", "HLM", "LLM", "VELO", "STOP"}
|
|
# Ensure these critical fields are present
|
|
suf.update({"VAL", "RBV", "DMOV"})
|
|
return sorted(suf)
|
|
|
|
|
|
_shared = {"mor": None, "stop": threading.Event(), "thread": None, "hosted": set()}
|
|
|
|
|
|
def _start_shared_ioc(prefix: str):
|
|
"""Start one IOC, host all PVs once, seed initial values, and run a small serve loop."""
|
|
mor = RealMorIOC(prefix)
|
|
mor.__enter__() # start server
|
|
|
|
# Host legacy + SIM basics
|
|
to_host = {
|
|
"VAL": float,
|
|
"PV1": float, "PV2": float, "PV3": float,
|
|
"PV4": float, "PV5": float, "PV6": float,
|
|
"SIM:VAL": float,
|
|
}
|
|
|
|
to_host.update({
|
|
"SIN-TIMAST-TMA:Bunch-1-Exp-Freq-RB": float, # PV for aramis
|
|
"SIN-TIMAST-TMA:Bunch-2-Exp-Freq-RB": float, # PV for athos
|
|
})
|
|
|
|
# Host all motor fields with appropriate types
|
|
suffixes = _motor_suffixes()
|
|
for mid in MOTOR_IDS:
|
|
for suf in suffixes:
|
|
typ = str if suf in STR_SUFFIXES else float
|
|
to_host[f"SIM:{mid}.{suf}"] = typ
|
|
# Ensure RTYP exists even if not present in suffixes
|
|
to_host[f"SIM:{mid}.RTYP"] = str
|
|
|
|
mor.host(**to_host)
|
|
_shared["hosted"].update(to_host.keys())
|
|
|
|
# Backing state with correct initial values
|
|
state = {}
|
|
|
|
state.update({
|
|
"SIN-TIMAST-TMA:Bunch-1-Exp-Freq-RB": 20.0,
|
|
"SIN-TIMAST-TMA:Bunch-2-Exp-Freq-RB": 20.0,
|
|
})
|
|
|
|
units = {
|
|
"SIN-TIMAST-TMA:Bunch-1-Exp-Freq-RB": "Hz",
|
|
"SIN-TIMAST-TMA:Bunch-2-Exp-Freq-RB": "Hz",
|
|
}
|
|
|
|
for pv, unit in units.items():
|
|
state[f"{pv}_units"] = unit
|
|
|
|
for name, typ in to_host.items():
|
|
if name.endswith(".RTYP"):
|
|
state[name] = "motor"
|
|
elif name.endswith(".DESC"):
|
|
state[name] = ""
|
|
elif name.endswith(".EGU"):
|
|
state[name] = "mm"
|
|
elif name.endswith(".DIR"):
|
|
state[name] = "Pos" # string is fine for DIR here
|
|
elif name in {"PV1", "PV2", "PV3", "PV4", "PV5", "PV6"}:
|
|
base = {"PV1": 1.0, "PV2": 2.0, "PV3": 3.0, "PV4": 4.0, "PV5": 5.0, "PV6": 6.0}
|
|
state[name] = base[name]
|
|
else:
|
|
state[name] = 0.0
|
|
|
|
# DMOV=1 and RBV initial == VAL for all motors
|
|
for mid in MOTOR_IDS:
|
|
state[f"SIM:{mid}.DMOV"] = 1.0
|
|
state[f"SIM:{mid}.RBV"] = state.get(f"SIM:{mid}.VAL", 0.0)
|
|
|
|
# Seed once so early epics.Motor get('RTYP') reads "motor" immediately
|
|
mor.serve(**state)
|
|
|
|
def loop():
|
|
while not _shared["stop"].is_set():
|
|
# Accept client writes (make writable PVs actually stick)
|
|
for name in list(state.keys()):
|
|
v = mor.get(name)
|
|
if v is not None:
|
|
state[name] = v
|
|
|
|
# RBV follows VAL
|
|
for mid in MOTOR_IDS:
|
|
state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"]
|
|
|
|
# Reassert constants
|
|
state["PV1"] = 1.0; state["PV2"] = 2.0; state["PV3"] = 3.0
|
|
state["PV4"] = 4.0; state["PV5"] = 5.0; state["PV6"] = 6.0
|
|
|
|
mor.serve(**state)
|
|
time.sleep(0.05)
|
|
|
|
t = threading.Thread(target=loop, daemon=True)
|
|
t.start()
|
|
_shared.update(mor=mor, thread=t)
|
|
|
|
|
|
class _MorIOCProxy:
|
|
"""
|
|
Context-manager drop-in that always reuses the shared IOC,
|
|
and guards .host() to avoid duplicate host calls if tests try to re-host.
|
|
"""
|
|
def __init__(self, *_, **__): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *args): return False
|
|
def __getattr__(self, name): return getattr(_shared["mor"], name)
|
|
def host(self, **pvs):
|
|
new = {k: v for k, v in pvs.items() if k not in _shared["hosted"]}
|
|
if new:
|
|
_shared["mor"].host(**new)
|
|
_shared["hosted"].update(new.keys())
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def start_ioc():
|
|
"""Start the IOC before each test, and clean up after."""
|
|
prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX)
|
|
if not prefix.endswith(":"):
|
|
prefix += ":"
|
|
os.environ[PREFIX_ENV] = prefix
|
|
|
|
_start_shared_ioc(prefix)
|
|
time.sleep(0.5) # small CA settle time
|
|
|
|
yield # This is where the actual test runs
|
|
|
|
# Cleanup after the test
|
|
_shared["stop"].set()
|
|
t = _shared.get("thread")
|
|
if t:
|
|
t.join(timeout=2.0)
|
|
mor = _shared.get("mor")
|
|
if mor:
|
|
try:
|
|
mor.__exit__(None, None, None)
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.2)
|
|
|
|
def pytest_configure(config):
|
|
"""Configure pytest to start the IOC and make sure the MorIOC is patched."""
|
|
prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX)
|
|
if not prefix.endswith(":"):
|
|
prefix += ":"
|
|
os.environ[PREFIX_ENV] = prefix
|
|
|
|
_start_shared_ioc(prefix)
|
|
time.sleep(0.5) # small CA settle time
|
|
|
|
|
|
def pytest_sessionfinish(session, exitstatus):
|
|
"""Stop the IOC and cleanup after all tests."""
|
|
_shared["stop"].set()
|
|
t = _shared.get("thread")
|
|
if t:
|
|
t.join(timeout=2.0)
|
|
mor = _shared.get("mor")
|
|
if mor:
|
|
try:
|
|
mor.__exit__(None, None, None)
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.2) |