This commit is contained in:
@@ -0,0 +1,182 @@
|
||||
import time
|
||||
import threading
|
||||
import pytest
|
||||
import epics
|
||||
from epics.pv import PV
|
||||
from slic.utils.hastyepics import *
|
||||
from morbidissimo import MorIOC
|
||||
|
||||
# IOC simulation
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def morioc_server():
|
||||
"""Démarre un IOC simulé pour tous les tests."""
|
||||
def run_ioc():
|
||||
with MorIOC("TEST:SIM:") as mor:
|
||||
current_val = 0.0
|
||||
mor.host(
|
||||
VAL=float,
|
||||
RBV=float,
|
||||
STATUS={"type": "enum", "enums": ["IDLE", "MOVING", "DONE"]},
|
||||
disabled=int, # pour tester suppression
|
||||
)
|
||||
while True:
|
||||
val_in = mor.get("VAL")
|
||||
if val_in is not None:
|
||||
current_val = val_in
|
||||
mor.serve(
|
||||
VAL=current_val,
|
||||
RBV=current_val,
|
||||
STATUS=0,
|
||||
disabled=0,
|
||||
RTYP="motor" # pour que epics.Motor original accepte
|
||||
)
|
||||
time.sleep(0.05)
|
||||
|
||||
t = threading.Thread(target=run_ioc, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.5)
|
||||
yield
|
||||
|
||||
|
||||
def test_get_pv_connect_false_and_true():
|
||||
pv = get_pv("TEST:SIM:VAL", connect=False)
|
||||
assert isinstance(pv, PV)
|
||||
assert not pv.connected
|
||||
|
||||
pv2 = get_pv("TEST:SIM:VAL", connect=True, timeout=2.0)
|
||||
assert pv2.connected
|
||||
|
||||
|
||||
def test_motor_init_strips_suffixes():
|
||||
m1 = Motor("TEST:SIM:M1.VAL")
|
||||
assert m1._prefix == "TEST:SIM:M1"
|
||||
|
||||
m2 = Motor("TEST:SIM:M2.")
|
||||
assert m2._prefix == "TEST:SIM:M2"
|
||||
|
||||
m3 = Motor("TEST:SIM:M3")
|
||||
assert m3._prefix == "TEST:SIM:M3"
|
||||
|
||||
|
||||
def test_motor_invalid_name_raises():
|
||||
with pytest.raises(MotorException):
|
||||
Motor(None)
|
||||
|
||||
|
||||
def test_motor_init_list_and_extras():
|
||||
m = Motor("TEST:SIM:M4")
|
||||
assert "disabled" not in m._init_list
|
||||
assert "disabled" not in m._extras
|
||||
assert len(m._init_list) > 0
|
||||
assert len(m._extras) > 0
|
||||
|
||||
for attr in m._init_list:
|
||||
pv = getattr(m, attr)
|
||||
assert isinstance(pv, PV)
|
||||
assert not pv.connected
|
||||
|
||||
for attr_name, suffix in m._extras.items():
|
||||
pv = getattr(m, attr_name)
|
||||
assert isinstance(pv, PV)
|
||||
assert pv.pvname == f"TEST:SIM:M4{suffix}"
|
||||
|
||||
|
||||
def test_motor_pv_method_connects_and_reads():
|
||||
m = Motor("TEST:SIM:M5")
|
||||
pv_val = m.PV("VAL", connect=True, timeout=1.0)
|
||||
assert pv_val.connected
|
||||
pv_val.put(123.0)
|
||||
time.sleep(0.1)
|
||||
assert pv_val.get() == 123.0
|
||||
|
||||
|
||||
def test_motor_reads_enum_status():
|
||||
m = Motor("TEST:SIM:M6")
|
||||
pv_status = m.PV("STATUS", connect=True)
|
||||
allowed = pv_status.enum_strs
|
||||
assert allowed == ["IDLE", "MOVING", "DONE"]
|
||||
|
||||
|
||||
def test_speedup_get_pv():
|
||||
"""Test que get_pv optimisé est plus rapide que epics.get_pv."""
|
||||
N = 10
|
||||
t0 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
get_pv("TEST:SIM:M1.VAL", connect=False)
|
||||
t1 = time.perf_counter()
|
||||
|
||||
t2 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
epics.get_pv("TEST:SIM:M1.VAL", connect=False)
|
||||
t3 = time.perf_counter()
|
||||
|
||||
fast_time = t1 - t0
|
||||
slow_time = t3 - t2
|
||||
assert fast_time < slow_time, (
|
||||
f"get_pv optimisé ({fast_time:.6f}s) plus lent que EPICS ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
|
||||
def test_speedup_motor_instantiation():
|
||||
"""Test que Motor optimisé est plus rapide à instancier que epics.Motor."""
|
||||
N = 10
|
||||
t0 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
Motor("TEST:SIM:M2")
|
||||
t1 = time.perf_counter()
|
||||
|
||||
t2 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
epics.Motor("TEST:SIM:M3")
|
||||
t3 = time.perf_counter()
|
||||
|
||||
fast_time = t1 - t0
|
||||
slow_time = t3 - t2
|
||||
assert fast_time < slow_time, (
|
||||
f"Motor optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
|
||||
def test_speedup_motor_PV():
|
||||
"""Test que Motor.PV optimisé est plus rapide avec connect=False."""
|
||||
m_epics = epics.Motor("TEST:SIM:M4")
|
||||
t0 = time.perf_counter()
|
||||
pv_epics = m_epics.PV("VAL", connect=True, timeout=1.0)
|
||||
t1 = time.perf_counter()
|
||||
|
||||
m_fast = Motor("TEST:SIM:M5")
|
||||
t2 = time.perf_counter()
|
||||
pv_fast = m_fast.PV("VAL", connect=False, timeout=1.0)
|
||||
t3 = time.perf_counter()
|
||||
|
||||
assert pv_epics.connected
|
||||
assert pv_fast.connected
|
||||
|
||||
slow_time = t1 - t0
|
||||
fast_time = t3 - t2
|
||||
assert fast_time < slow_time, (
|
||||
f"Motor.PV optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor.PV ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
|
||||
def test_motor_init_list_attrs_created():
|
||||
m = Motor("TEST:SIM:M10")
|
||||
for attr in m._init_list:
|
||||
pv = getattr(m, attr)
|
||||
assert isinstance(pv, PV), f"{attr} n'est pas un PV"
|
||||
assert not pv.connected, f"{attr} devrait être non connecté au départ"
|
||||
|
||||
|
||||
def test_motor_extras_attrs_correct():
|
||||
m = Motor("TEST:SIM:M11")
|
||||
for key, suffix in m._extras.items():
|
||||
assert hasattr(m, key), f"{key} manquant dans Motor"
|
||||
pv = getattr(m, key)
|
||||
assert isinstance(pv, PV)
|
||||
assert pv.pvname == f"TEST:SIM:M11{suffix}"
|
||||
|
||||
|
||||
def test_motor_callbacks_empty():
|
||||
m = Motor("TEST:SIM:M12")
|
||||
assert isinstance(m._callbacks, dict)
|
||||
assert m._callbacks == {}
|
||||
Reference in New Issue
Block a user