Update tests/ioc_plugin.py
Run CI Tests / test (push) Successful in 1m39s

This commit is contained in:
2025-08-11 11:44:44 +02:00
parent c3efdae867
commit 4c79d82b09
+24 -72
View File
@@ -1,20 +1,12 @@
# ioc_plugin.py
import os
import sys
import time
import threading
import importlib
import pkgutil
import pytest
import os, sys, time, threading, pytest, importlib, pkgutil
import epics
from epics.pv import _PVcache_
PREFIX_ENV = "TEST_PV_PREFIX"
DEFAULT_PREFIX = "TEST:"
MOTOR_IDS = [f"M{i}" for i in range(1, 16)] # M1..M15
STR_SUFFIXES = {"RTYP", "DESC", "EGU", "DIR"} # serve string types for these
MOTOR_IDS = [f"M{i}" for i in range(1, 16)]
# -------- Robust MorIOC loader (handles namespace packages) ------------------
# ---- Robust MorIOC loader (handles namespace packages)
def _load_morioc():
try:
from morbidissimo import MorIOC # fast path
@@ -34,101 +26,64 @@ def _load_morioc():
RealMorIOC = _load_morioc()
def _motor_suffixes():
"""Collect expected motor field suffixes from epics.Motor (init_list + extras)."""
init_list = tuple(getattr(epics.Motor, "_init_list", ()))
extras = dict(getattr(epics.Motor, "_extras", {}))
suf = set(init_list) | {s.lstrip(".") for s in extras.values()}
if not suf:
# minimalist fallback
suf = {"VAL", "RBV", "DMOV", "DESC", "EGU", "DIR", "HLM", "LLM", "VELO", "STOP"}
# ensure these critical fields exist
suf.update({"VAL", "RBV", "DMOV"})
suf = {"VAL","RBV","DMOV","DESC","EGU","DIR","HLM","LLM","VELO","STOP"}
return sorted(suf)
_shared = {"mor": None, "stop": threading.Event(), "thread": None, "hosted": set()}
def _start_shared_ioc(prefix: str):
"""Start one IOC, host all PVs once, and run a small serve loop."""
mor = RealMorIOC(prefix)
mor.__enter__() # start server
# Host legacy + SIM basics
# Host legacy + SIM basic + all motor fields
to_host = {
"VAL": float,
"PV1": float, "PV2": float, "PV3": float,
"PV4": float, "PV5": float, "PV6": float,
"SIM:VAL": float,
}
# Host all motor fields with appropriate types
suffixes = _motor_suffixes()
for mid in MOTOR_IDS:
for suf in suffixes:
typ = str if suf in STR_SUFFIXES else float
to_host[f"SIM:{mid}.{suf}"] = typ
# Ensure RTYP exists even if not seen above
for suf in _motor_suffixes():
to_host[f"SIM:{mid}.{suf}"] = float
to_host[f"SIM:{mid}.RTYP"] = str
mor.host(**to_host)
_shared["hosted"].update(to_host.keys())
# Backing state with correct initial values (seed before any client get)
state = {}
for name, typ in to_host.items():
if name.endswith(".RTYP"):
state[name] = "motor"
elif name.endswith(".DESC"):
state[name] = ""
elif name.endswith(".EGU"):
state[name] = "mm"
elif name.endswith(".DIR"):
state[name] = "Pos"
elif name in {"PV1", "PV2", "PV3", "PV4", "PV5", "PV6"}:
# will be overwritten each loop too
base = {"PV1": 1.0, "PV2": 2.0, "PV3": 3.0, "PV4": 4.0, "PV5": 5.0, "PV6": 6.0}
state[name] = base[name]
else:
state[name] = 0.0
# DMOV=1 and RBV initial == VAL
# Backing state
state = {k: 0.0 for k in to_host.keys()}
for mid in MOTOR_IDS:
state[f"SIM:{mid}.DMOV"] = 1.0
state[f"SIM:{mid}.RBV"] = state.get(f"SIM:{mid}.VAL", 0.0)
# Seed once so early caget/get('RTYP') see correct data
mor.serve(**state)
state[f"SIM:{mid}.DMOV"] = 1.0
state[f"SIM:{mid}.RTYP"] = "motor"
def loop():
while not _shared["stop"].is_set():
# accept client writes (make writable PVs actually stick)
for name in list(state.keys()):
# accept writes
for name in state.keys():
v = mor.get(name)
if v is not None:
state[name] = v
# RBV follows VAL
for mid in MOTOR_IDS:
state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"]
# constants
state["PV1"] = 1.0; state["PV2"] = 2.0; state["PV3"] = 3.0
state["PV4"] = 4.0; state["PV5"] = 5.0; state["PV6"] = 6.0
mor.serve(**state)
# serve (+ static PV1..PV6)
mor.serve(
**state,
PV1=1.0, PV2=2.0, PV3=3.0,
PV4=4.0, PV5=5.0, PV6=6.0,
)
time.sleep(0.05)
t = threading.Thread(target=loop, daemon=True)
t.start()
_shared.update(mor=mor, thread=t)
class _MorIOCProxy:
"""
Context-manager drop-in that always reuses the shared IOC,
and guards .host() to avoid duplicate host calls.
"""
"""Any MorIOC(...) in tests reuses the shared IOC and avoids re-hosting."""
def __init__(self, *_, **__): pass
def __enter__(self): return self
def __exit__(self, *args): return False
@@ -139,24 +94,21 @@ class _MorIOCProxy:
_shared["mor"].host(**new)
_shared["hosted"].update(new.keys())
def pytest_configure(config):
# Ensure prefix format and start shared IOC *before* test collection
prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX)
if not prefix.endswith(":"):
prefix += ":"
os.environ[PREFIX_ENV] = prefix
# Start shared IOC BEFORE tests and patch MorIOC globally
_start_shared_ioc(prefix)
# Monkey-patch global MorIOC so any test creating an IOC reuses the shared one
import morbidissimo
morbidissimo.MorIOC = _MorIOCProxy
sys.modules["morbidissimo"].MorIOC = _MorIOCProxy
# small CA settle time
print(f"[ioc_plugin] Shared IOC up with prefix '{prefix}'")
time.sleep(0.5) # small CA settle time
time.sleep(0.5)
def pytest_sessionfinish(session, exitstatus):
_shared["stop"].set()
@@ -169,4 +121,4 @@ def pytest_sessionfinish(session, exitstatus):
mor.__exit__(None, None, None)
except Exception:
pass
time.sleep(0.2)
time.sleep(0.2)