@@ -0,0 +1,60 @@
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import pytest
|
||||
from morbidissimo import MorIOC
|
||||
|
||||
# Environment variable to share the PV prefix with tests
|
||||
PREFIX_ENV = "TEST_PV_PREFIX"
|
||||
DEFAULT_PREFIX = "TEST:"
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def ioc_session():
|
||||
"""
|
||||
Start exactly ONE IOC for the entire pytest session.
|
||||
All tests can reuse it to avoid port/prefix conflicts.
|
||||
Add all PVs needed by your test suite here.
|
||||
"""
|
||||
prefix = os.environ.get(PREFIX_ENV, DEFAULT_PREFIX)
|
||||
# Ensure tests can read the chosen prefix
|
||||
os.environ.setdefault(PREFIX_ENV, prefix)
|
||||
|
||||
stop = threading.Event()
|
||||
|
||||
def run():
|
||||
with MorIOC(prefix) as mor:
|
||||
# Host PVs once for the whole session
|
||||
mor.host(
|
||||
VAL=float, # writable PV used in progress-bar tests
|
||||
PV1=float, PV2=float, PV3=float,
|
||||
PV4=float, PV5=float, PV6=float, # static PVs
|
||||
)
|
||||
|
||||
current_val = 0.0 # backing state for VAL
|
||||
|
||||
# Serve loop; keep running until the session ends
|
||||
while not stop.is_set():
|
||||
# Make VAL writable: read incoming value and persist it
|
||||
new_val = mor.get("VAL")
|
||||
if new_val is not None:
|
||||
current_val = new_val
|
||||
|
||||
# Serve current values (static PVs as constants)
|
||||
mor.serve(
|
||||
VAL=current_val,
|
||||
PV1=1.0, PV2=2.0, PV3=3.0,
|
||||
PV4=4.0, PV5=5.0, PV6=6.0,
|
||||
)
|
||||
time.sleep(0.05) # small delay to avoid a busy loop
|
||||
|
||||
# Start IOC in a background thread
|
||||
t = threading.Thread(target=run, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.8) # give Channel Access time to announce PVs
|
||||
|
||||
yield # <<< tests execute during this period >>>
|
||||
|
||||
# Graceful teardown
|
||||
stop.set()
|
||||
t.join(timeout=2.0)
|
||||
time.sleep(0.2) # allow OS to release sockets
|
||||
Reference in New Issue
Block a user