Update tests/test_utils_hastyepics.py
Run CI Tests / test (push) Successful in 3m14s

This commit is contained in:
2025-08-08 15:10:48 +02:00
parent 4abf793815
commit 05957cc6d3
+149 -91
View File
@@ -1,115 +1,173 @@
import time
import epics
import threading
import pytest
from epics.device import Device
from epics.motor import MotorException
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import epics
from slic.utils.hastyepics import *
from unittest.mock import patch, PropertyMock
from morbidissimo import MorIOC
def test_motor_instantiation_fast_vs_epics():
fast_times = []
slow_times = []
# IOC simualtion
@pytest.fixture(scope="module", autouse=True)
def morioc_server():
"""Démarre un IOC simulé pour tous les tests."""
def run_ioc():
with MorIOC("TEST:SIM:") as mor:
current_val = 0.0
mor.host(
VAL=float,
RBV=float,
STATUS={"type": "enum", "enums": ["IDLE", "MOVING", "DONE"]},
)
while True:
val_in = mor.get("VAL")
if val_in is not None:
current_val = val_in
mor.serve(VAL=current_val, RBV=current_val)
time.sleep(0.05)
# Mock to simulate a real EPICS moteur
with patch('epics.Motor.get') as mock_get:
mock_get.return_value = 'motor'
# Mean of 10 runs
for _ in range(10):
t0 = time.perf_counter()
m_fast = Motor("TEST:MOTOR_t1")
t1 = time.perf_counter()
t2 = time.perf_counter()
m_slow = epics.Motor("TEST:MOTOR_t2")
t3 = time.perf_counter()
fast_times.append(t1 - t0)
slow_times.append(t3 - t2)
avg_fast = sum(fast_times) / len(fast_times)
avg_slow = sum(slow_times) / len(slow_times)
assert avg_fast < avg_slow, f"Fast version ({avg_fast:.6f}s) should be faster than EPICS ({avg_slow:.6f}s)"
t = threading.Thread(target=run_ioc, daemon=True)
t.start()
time.sleep(0.5)
yield
def test_motor_without_suffix_and_dot():
# Suffix .VAL ... dot should be stripped from name
m1 = Motor("TEST:MOTOR_1.VAL")
assert m1._prefix == "TEST:MOTOR_1"
m2 = Motor("TEST:MOTOR_2.")
assert m2._prefix == "TEST:MOTOR_2"
def test_get_pv_connect_false_and_true():
pv = get_pv("TEST:SIM:VAL", connect=False)
assert isinstance(pv, PV)
assert not pv.connected
m3 = Motor("TEST:MOTOR")
assert m3._prefix == "TEST:MOTOR"
pv2 = get_pv("TEST:SIM:VAL", connect=True, timeout=2.0)
assert pv2.connected
def test_motor_init_strips_suffixes():
m1 = Motor("TEST:SIM:M1.VAL")
assert m1._prefix == "TEST:SIM:M1"
m2 = Motor("TEST:SIM:M2.")
assert m2._prefix == "TEST:SIM:M2"
m3 = Motor("TEST:SIM:M3")
assert m3._prefix == "TEST:SIM:M3"
def test_motor_invalid_name_raises():
# Should raise MotorException if name is none
with pytest.raises(MotorException):
Motor(None)
def test_motor_init_list_has_no_disabled():
assert "disabled" not in Motor._init_list
assert "disabled" not in Motor._extras
def test_motor_initializes_device_attrs():
# Check that Motor correctly initializes all Device PVs from _init_list
m = Motor("TEST:MOTOR")
print("_init_list contents:", m._init_list)
print("All attributes on motor:", dir(m))
def test_motor_init_list_and_extras():
m = Motor("TEST:SIM:M4")
# disabled doit avoir été retiré
assert "disabled" not in m._init_list
assert "disabled" not in m._extras
# Vérifier qu'ils ne sont pas vides
assert len(m._init_list) > 0
assert len(m._extras) > 0
# Tous les PV de _init_list existent et sont des PV
for attr in m._init_list:
print(f"\nChecking attribute: {attr}")
assert hasattr(m, attr), f"Motor is missing attribute: {attr}"
pv = getattr(m, attr)
print(f"PV object: {pv}")
print("Type:", type(pv))
if pv is not None:
print(f"PV name: {pv.pvname}")
print(f"PV connected: {pv.connected}")
assert isinstance(pv, epics.pv.PV), f"{attr} is not a valid PV object"
assert not pv.connected, f"{attr} should not be connected yet"
assert isinstance(pv, PV)
assert not pv.connected
def test_motor_pv_connection_skipped_by_default():
# PVs should not be connected right after Motor creation
m = Motor("TEST:MOTOR")
for key in m._extras:
pv = m.PV(key)
# Explicitly check that no connection was forced during init
assert not pv.connected, f"PV {key} was connected unexpectedly"
def test_motor_adds_all_extras_pvs():
# Given a test motor name
m = Motor("TEST:MOTOR")
# For each key in _extras, the motor should have an attribute with that name
# Tous les PV de _extras existent et sont corrects
for attr_name, suffix in m._extras.items():
# The attribute must exist
assert hasattr(m, attr_name), f"Missing attribute: {attr_name}"
# The attribute must be a PV
pv = getattr(m, attr_name)
assert isinstance(pv, PV), f"{attr_name} is not a PV"
assert isinstance(pv, PV)
assert pv.pvname == f"TEST:SIM:M4{suffix}"
# The PV's name should match the expected pattern
expected_name = f"TEST:MOTOR{suffix}"
assert pv.pvname == expected_name, f"PV name mismatch for {attr_name}"
assert not pv.connected, f"{attr_name} PV should not be connected immediately"
def test_motor_pv_method_connects_and_reads():
m = Motor("TEST:SIM:M5")
pv_val = m.PV("VAL", connect=True, timeout=1.0)
assert pv_val.connected
pv_val.put(123.0)
time.sleep(0.1)
assert pv_val.get() == 123.0
def test_motor_PV_method_exists_and_works():
# Motor.PV(): should call super().PV and return a PV object
m = Motor("TEST:MOTOR")
pv = m.PV("RBV", connect=False)
assert pv is not None
assert isinstance(pv, epics.pv.PV)
def test_motor_reads_enum_status():
m = Motor("TEST:SIM:M6")
pv_status = m.PV("STATUS", connect=True)
allowed = pv_status.enum_strs
assert allowed == ["IDLE", "MOVING", "DONE"]
def test_speedup_get_pv():
"""Test que get_pv optimisé est plus rapide que epics.get_pv."""
t0 = time.perf_counter()
pv_fast = get_pv("TEST:SIM:M1.VAL", connect=False)
t1 = time.perf_counter()
t2 = time.perf_counter()
pv_slow = epics.get_pv("TEST:SIM:M1.VAL", connect=False)
t3 = time.perf_counter()
fast_time = t1 - t0
slow_time = t3 - t2
assert fast_time < slow_time, (
f"get_pv optimisé ({fast_time:.6f}s) plus lent que EPICS ({slow_time:.6f}s)"
)
def test_speedup_motor_instantiation():
"""Test que Motor optimisé est plus rapide à instancier que epics.Motor."""
t0 = time.perf_counter()
m_fast = Motor("TEST:SIM:M2")
t1 = time.perf_counter()
t2 = time.perf_counter()
m_slow = epics.Motor("TEST:SIM:M3")
t3 = time.perf_counter()
fast_time = t1 - t0
slow_time = t3 - t2
assert fast_time < slow_time, (
f"Motor optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor ({slow_time:.6f}s)"
)
def test_speedup_motor_PV():
"""Test que PV optimisé est plus rapide que PV avec connect=True."""
m_slow = Motor("TEST:SIM:M4")
t0 = time.perf_counter()
pv_slow = m_slow.PV("VAL", connect=True, timeout=1.0)
t1 = time.perf_counter()
m_fast = Motor("TEST:SIM:M5")
t2 = time.perf_counter()
pv_fast = m_fast.PV("VAL", connect=False, timeout=1.0)
t3 = time.perf_counter()
slow_time = t1 - t0
fast_time = t3 - t2
assert pv_epics.connected
assert pv_fast.connected
assert fast_time < slow_time, (
f"Motor.PV optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor.PV ({slow_time:.6f}s)"
)
def test_motor_init_list_attrs_created():
m = Motor("TEST:SIM:M10")
for attr in m._init_list:
pv = getattr(m, attr)
assert isinstance(pv, epics.pv.PV), f"{attr} n'est pas un PV"
assert not pv.connected, f"{attr} devrait être non connecté au départ"
def test_motor_extras_attrs_correct():
m = Motor("TEST:SIM:M11")
for key, suffix in m._extras.items():
assert hasattr(m, key), f"{key} manquant dans Motor"
pv = getattr(m, key)
assert isinstance(pv, epics.pv.PV)
assert pv.pvname == f"TEST:SIM:M11{suffix}"
def test_motor_callbacks_empty():
m = Motor("TEST:SIM:M12")
assert isinstance(m._callbacks, dict)
assert m._callbacks == {}