This commit is contained in:
+43
-37
@@ -1,68 +1,74 @@
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import pytest
|
||||
# ioc_plugin.py
|
||||
import os, time, threading, pytest
|
||||
from morbidissimo import MorIOC
|
||||
import epics # to read Motor field lists
|
||||
|
||||
PREFIX_ENV = "TEST_PV_PREFIX"
|
||||
DEFAULT_PREFIX = "TEST:"
|
||||
|
||||
# Motors you reference in tests: M1..M13
|
||||
MOTOR_IDS = [f"M{i}" for i in range(1, 14)]
|
||||
|
||||
def _motor_suffixes():
|
||||
"""Collect the field suffixes epics.Motor expects; fallback if missing."""
|
||||
init_list = tuple(getattr(epics.Motor, "_init_list", ()))
|
||||
extras = dict(getattr(epics.Motor, "_extras", {}))
|
||||
suffixes = set(init_list) | {s.lstrip(".") for s in extras.values()}
|
||||
if not suffixes:
|
||||
suffixes = {"VAL","RBV","DMOV","DESC","EGU","DIR","HLM","LLM","VELO","STOP"}
|
||||
return sorted(suffixes)
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def ioc_session():
|
||||
"""
|
||||
Start exactly ONE IOC for the whole test session.
|
||||
Hosts both legacy PVs (TEST:VAL, TEST:PV1..PV6) and SIM PVs
|
||||
(TEST:SIM:VAL, TEST:SIM:M*.VAL) used by the motor tests.
|
||||
ONE IOC for the whole test session:
|
||||
- TEST:VAL and TEST:PV1..PV6 (constants)
|
||||
- TEST:SIM:VAL
|
||||
- TEST:SIM:M*.<all motor fields> for M1..M13
|
||||
"""
|
||||
prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX)
|
||||
os.environ.setdefault(PREFIX_ENV, prefix)
|
||||
|
||||
motor_suffixes = _motor_suffixes()
|
||||
stop = threading.Event()
|
||||
|
||||
def run():
|
||||
with MorIOC(prefix) as mor:
|
||||
# ---- Host PVs once
|
||||
# Legacy/top-level PVs used by other tests
|
||||
# Legacy PVs + SIM:VAL
|
||||
mor.host(
|
||||
VAL=float, # TEST:VAL
|
||||
PV1=float, PV2=float, PV3=float,
|
||||
PV4=float, PV5=float, PV6=float,
|
||||
)
|
||||
# SIM namespace PVs
|
||||
mor.host(
|
||||
**{"SIM:VAL": float}, # TEST:SIM:VAL
|
||||
**{f"SIM:{mid}.VAL": float for mid in MOTOR_IDS}, # TEST:SIM:M*.VAL
|
||||
VAL=float,
|
||||
PV1=float, PV2=float, PV3=float, PV4=float, PV5=float, PV6=float,
|
||||
**{"SIM:VAL": float},
|
||||
)
|
||||
|
||||
# ---- Backing state (writable PVs keep last written value)
|
||||
state = {
|
||||
"VAL": 0.0,
|
||||
"SIM:VAL": 0.0,
|
||||
**{f"SIM:{mid}.VAL": 0.0 for mid in MOTOR_IDS},
|
||||
}
|
||||
# All motor fields for each motor
|
||||
motor_fields = {f"SIM:{mid}.{suf}": float
|
||||
for mid in MOTOR_IDS for suf in motor_suffixes}
|
||||
mor.host(**motor_fields)
|
||||
|
||||
# Backing state (writable values we keep/serve)
|
||||
state = {"VAL": 0.0, "SIM:VAL": 0.0}
|
||||
for k in motor_fields:
|
||||
state[k] = 0.0
|
||||
for mid in MOTOR_IDS:
|
||||
state[f"SIM:{mid}.DMOV"] = 1.0 # motors 'done' by default
|
||||
|
||||
# ---- Serve loop
|
||||
while not stop.is_set():
|
||||
# Accept writes
|
||||
for name in state.keys():
|
||||
new_val = mor.get(name)
|
||||
if new_val is not None:
|
||||
state[name] = new_val
|
||||
v = mor.get(name)
|
||||
if v is not None:
|
||||
state[name] = v
|
||||
|
||||
# Publish current values; PV1..PV6 are static constants
|
||||
mor.serve(
|
||||
**state,
|
||||
PV1=1.0, PV2=2.0, PV3=3.0,
|
||||
PV4=4.0, PV5=5.0, PV6=6.0,
|
||||
)
|
||||
time.sleep(0.05) # avoid busy loop
|
||||
# Make RBV follow VAL for each motor
|
||||
for mid in MOTOR_IDS:
|
||||
state[f"SIM:{mid}.RBV"] = state[f"SIM:{mid}.VAL"]
|
||||
|
||||
# Serve current values (+ static PV1..PV6)
|
||||
mor.serve(**state, PV1=1.0, PV2=2.0, PV3=3.0, PV4=4.0, PV5=5.0, PV6=6.0)
|
||||
time.sleep(0.05)
|
||||
|
||||
t = threading.Thread(target=run, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.8) # let CA announce PVs
|
||||
time.sleep(0.8) # let CA announce
|
||||
|
||||
yield
|
||||
|
||||
|
||||
Reference in New Issue
Block a user