This commit is contained in:
+149
-91
@@ -1,115 +1,173 @@
|
||||
import time
|
||||
import epics
|
||||
import threading
|
||||
import pytest
|
||||
from epics.device import Device
|
||||
from epics.motor import MotorException
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
import epics
|
||||
from slic.utils.hastyepics import *
|
||||
from unittest.mock import patch, PropertyMock
|
||||
from morbidissimo import MorIOC
|
||||
|
||||
def test_motor_instantiation_fast_vs_epics():
|
||||
|
||||
fast_times = []
|
||||
slow_times = []
|
||||
# IOC simualtion
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def morioc_server():
|
||||
"""Démarre un IOC simulé pour tous les tests."""
|
||||
def run_ioc():
|
||||
with MorIOC("TEST:SIM:") as mor:
|
||||
current_val = 0.0
|
||||
mor.host(
|
||||
VAL=float,
|
||||
RBV=float,
|
||||
STATUS={"type": "enum", "enums": ["IDLE", "MOVING", "DONE"]},
|
||||
)
|
||||
while True:
|
||||
val_in = mor.get("VAL")
|
||||
if val_in is not None:
|
||||
current_val = val_in
|
||||
mor.serve(VAL=current_val, RBV=current_val)
|
||||
time.sleep(0.05)
|
||||
|
||||
# Mock to simulate a real EPICS moteur
|
||||
with patch('epics.Motor.get') as mock_get:
|
||||
mock_get.return_value = 'motor'
|
||||
|
||||
# Mean of 10 runs
|
||||
for _ in range(10):
|
||||
t0 = time.perf_counter()
|
||||
m_fast = Motor("TEST:MOTOR_t1")
|
||||
t1 = time.perf_counter()
|
||||
|
||||
t2 = time.perf_counter()
|
||||
m_slow = epics.Motor("TEST:MOTOR_t2")
|
||||
t3 = time.perf_counter()
|
||||
|
||||
fast_times.append(t1 - t0)
|
||||
slow_times.append(t3 - t2)
|
||||
|
||||
avg_fast = sum(fast_times) / len(fast_times)
|
||||
avg_slow = sum(slow_times) / len(slow_times)
|
||||
|
||||
assert avg_fast < avg_slow, f"Fast version ({avg_fast:.6f}s) should be faster than EPICS ({avg_slow:.6f}s)"
|
||||
t = threading.Thread(target=run_ioc, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.5)
|
||||
yield
|
||||
|
||||
def test_motor_without_suffix_and_dot():
|
||||
# Suffix .VAL ... dot should be stripped from name
|
||||
m1 = Motor("TEST:MOTOR_1.VAL")
|
||||
assert m1._prefix == "TEST:MOTOR_1"
|
||||
|
||||
m2 = Motor("TEST:MOTOR_2.")
|
||||
assert m2._prefix == "TEST:MOTOR_2"
|
||||
def test_get_pv_connect_false_and_true():
|
||||
pv = get_pv("TEST:SIM:VAL", connect=False)
|
||||
assert isinstance(pv, PV)
|
||||
assert not pv.connected
|
||||
|
||||
m3 = Motor("TEST:MOTOR")
|
||||
assert m3._prefix == "TEST:MOTOR"
|
||||
pv2 = get_pv("TEST:SIM:VAL", connect=True, timeout=2.0)
|
||||
assert pv2.connected
|
||||
|
||||
|
||||
def test_motor_init_strips_suffixes():
|
||||
m1 = Motor("TEST:SIM:M1.VAL")
|
||||
assert m1._prefix == "TEST:SIM:M1"
|
||||
|
||||
m2 = Motor("TEST:SIM:M2.")
|
||||
assert m2._prefix == "TEST:SIM:M2"
|
||||
|
||||
m3 = Motor("TEST:SIM:M3")
|
||||
assert m3._prefix == "TEST:SIM:M3"
|
||||
|
||||
|
||||
def test_motor_invalid_name_raises():
|
||||
# Should raise MotorException if name is none
|
||||
with pytest.raises(MotorException):
|
||||
Motor(None)
|
||||
|
||||
def test_motor_init_list_has_no_disabled():
|
||||
assert "disabled" not in Motor._init_list
|
||||
assert "disabled" not in Motor._extras
|
||||
|
||||
def test_motor_initializes_device_attrs():
|
||||
# Check that Motor correctly initializes all Device PVs from _init_list
|
||||
m = Motor("TEST:MOTOR")
|
||||
|
||||
|
||||
print("_init_list contents:", m._init_list)
|
||||
print("All attributes on motor:", dir(m))
|
||||
|
||||
def test_motor_init_list_and_extras():
|
||||
m = Motor("TEST:SIM:M4")
|
||||
# disabled doit avoir été retiré
|
||||
assert "disabled" not in m._init_list
|
||||
assert "disabled" not in m._extras
|
||||
|
||||
# Vérifier qu'ils ne sont pas vides
|
||||
assert len(m._init_list) > 0
|
||||
assert len(m._extras) > 0
|
||||
|
||||
# Tous les PV de _init_list existent et sont des PV
|
||||
for attr in m._init_list:
|
||||
print(f"\nChecking attribute: {attr}")
|
||||
assert hasattr(m, attr), f"Motor is missing attribute: {attr}"
|
||||
pv = getattr(m, attr)
|
||||
|
||||
print(f"PV object: {pv}")
|
||||
print("Type:", type(pv))
|
||||
if pv is not None:
|
||||
print(f"PV name: {pv.pvname}")
|
||||
print(f"PV connected: {pv.connected}")
|
||||
|
||||
assert isinstance(pv, epics.pv.PV), f"{attr} is not a valid PV object"
|
||||
assert not pv.connected, f"{attr} should not be connected yet"
|
||||
|
||||
assert isinstance(pv, PV)
|
||||
assert not pv.connected
|
||||
|
||||
def test_motor_pv_connection_skipped_by_default():
|
||||
# PVs should not be connected right after Motor creation
|
||||
m = Motor("TEST:MOTOR")
|
||||
for key in m._extras:
|
||||
pv = m.PV(key)
|
||||
# Explicitly check that no connection was forced during init
|
||||
assert not pv.connected, f"PV {key} was connected unexpectedly"
|
||||
|
||||
def test_motor_adds_all_extras_pvs():
|
||||
# Given a test motor name
|
||||
m = Motor("TEST:MOTOR")
|
||||
|
||||
# For each key in _extras, the motor should have an attribute with that name
|
||||
# Tous les PV de _extras existent et sont corrects
|
||||
for attr_name, suffix in m._extras.items():
|
||||
# The attribute must exist
|
||||
assert hasattr(m, attr_name), f"Missing attribute: {attr_name}"
|
||||
|
||||
# The attribute must be a PV
|
||||
pv = getattr(m, attr_name)
|
||||
assert isinstance(pv, PV), f"{attr_name} is not a PV"
|
||||
assert isinstance(pv, PV)
|
||||
assert pv.pvname == f"TEST:SIM:M4{suffix}"
|
||||
|
||||
# The PV's name should match the expected pattern
|
||||
expected_name = f"TEST:MOTOR{suffix}"
|
||||
assert pv.pvname == expected_name, f"PV name mismatch for {attr_name}"
|
||||
|
||||
assert not pv.connected, f"{attr_name} PV should not be connected immediately"
|
||||
def test_motor_pv_method_connects_and_reads():
|
||||
m = Motor("TEST:SIM:M5")
|
||||
pv_val = m.PV("VAL", connect=True, timeout=1.0)
|
||||
assert pv_val.connected
|
||||
pv_val.put(123.0)
|
||||
time.sleep(0.1)
|
||||
assert pv_val.get() == 123.0
|
||||
|
||||
def test_motor_PV_method_exists_and_works():
|
||||
# Motor.PV(): should call super().PV and return a PV object
|
||||
m = Motor("TEST:MOTOR")
|
||||
pv = m.PV("RBV", connect=False)
|
||||
assert pv is not None
|
||||
assert isinstance(pv, epics.pv.PV)
|
||||
|
||||
def test_motor_reads_enum_status():
|
||||
m = Motor("TEST:SIM:M6")
|
||||
pv_status = m.PV("STATUS", connect=True)
|
||||
allowed = pv_status.enum_strs
|
||||
assert allowed == ["IDLE", "MOVING", "DONE"]
|
||||
|
||||
|
||||
def test_speedup_get_pv():
|
||||
"""Test que get_pv optimisé est plus rapide que epics.get_pv."""
|
||||
t0 = time.perf_counter()
|
||||
pv_fast = get_pv("TEST:SIM:M1.VAL", connect=False)
|
||||
t1 = time.perf_counter()
|
||||
|
||||
t2 = time.perf_counter()
|
||||
pv_slow = epics.get_pv("TEST:SIM:M1.VAL", connect=False)
|
||||
t3 = time.perf_counter()
|
||||
|
||||
fast_time = t1 - t0
|
||||
slow_time = t3 - t2
|
||||
|
||||
assert fast_time < slow_time, (
|
||||
f"get_pv optimisé ({fast_time:.6f}s) plus lent que EPICS ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
|
||||
def test_speedup_motor_instantiation():
|
||||
"""Test que Motor optimisé est plus rapide à instancier que epics.Motor."""
|
||||
t0 = time.perf_counter()
|
||||
m_fast = Motor("TEST:SIM:M2")
|
||||
t1 = time.perf_counter()
|
||||
|
||||
t2 = time.perf_counter()
|
||||
m_slow = epics.Motor("TEST:SIM:M3")
|
||||
t3 = time.perf_counter()
|
||||
|
||||
fast_time = t1 - t0
|
||||
slow_time = t3 - t2
|
||||
|
||||
assert fast_time < slow_time, (
|
||||
f"Motor optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
|
||||
def test_speedup_motor_PV():
|
||||
"""Test que PV optimisé est plus rapide que PV avec connect=True."""
|
||||
m_slow = Motor("TEST:SIM:M4")
|
||||
t0 = time.perf_counter()
|
||||
pv_slow = m_slow.PV("VAL", connect=True, timeout=1.0)
|
||||
t1 = time.perf_counter()
|
||||
|
||||
m_fast = Motor("TEST:SIM:M5")
|
||||
t2 = time.perf_counter()
|
||||
pv_fast = m_fast.PV("VAL", connect=False, timeout=1.0)
|
||||
t3 = time.perf_counter()
|
||||
|
||||
slow_time = t1 - t0
|
||||
fast_time = t3 - t2
|
||||
|
||||
assert pv_epics.connected
|
||||
assert pv_fast.connected
|
||||
assert fast_time < slow_time, (
|
||||
f"Motor.PV optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor.PV ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
def test_motor_init_list_attrs_created():
|
||||
m = Motor("TEST:SIM:M10")
|
||||
for attr in m._init_list:
|
||||
pv = getattr(m, attr)
|
||||
assert isinstance(pv, epics.pv.PV), f"{attr} n'est pas un PV"
|
||||
assert not pv.connected, f"{attr} devrait être non connecté au départ"
|
||||
|
||||
def test_motor_extras_attrs_correct():
|
||||
m = Motor("TEST:SIM:M11")
|
||||
for key, suffix in m._extras.items():
|
||||
assert hasattr(m, key), f"{key} manquant dans Motor"
|
||||
pv = getattr(m, key)
|
||||
assert isinstance(pv, epics.pv.PV)
|
||||
assert pv.pvname == f"TEST:SIM:M11{suffix}"
|
||||
|
||||
def test_motor_callbacks_empty():
|
||||
m = Motor("TEST:SIM:M12")
|
||||
assert isinstance(m._callbacks, dict)
|
||||
assert m._callbacks == {}
|
||||
Reference in New Issue
Block a user