From 05957cc6d3d82ee58cb9601930fe11a20048fdb8 Mon Sep 17 00:00:00 2001 From: tligui_y Date: Fri, 8 Aug 2025 15:10:48 +0200 Subject: [PATCH] Update tests/test_utils_hastyepics.py --- tests/test_utils_hastyepics.py | 240 ++++++++++++++++++++------------- 1 file changed, 149 insertions(+), 91 deletions(-) diff --git a/tests/test_utils_hastyepics.py b/tests/test_utils_hastyepics.py index 00317def3..7d8a1e146 100644 --- a/tests/test_utils_hastyepics.py +++ b/tests/test_utils_hastyepics.py @@ -1,115 +1,173 @@ import time -import epics +import threading import pytest -from epics.device import Device -from epics.motor import MotorException -import sys -import os -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +import epics from slic.utils.hastyepics import * -from unittest.mock import patch, PropertyMock +from morbidissimo import MorIOC -def test_motor_instantiation_fast_vs_epics(): - fast_times = [] - slow_times = [] +# IOC simualtion +@pytest.fixture(scope="module", autouse=True) +def morioc_server(): + """Démarre un IOC simulé pour tous les tests.""" + def run_ioc(): + with MorIOC("TEST:SIM:") as mor: + current_val = 0.0 + mor.host( + VAL=float, + RBV=float, + STATUS={"type": "enum", "enums": ["IDLE", "MOVING", "DONE"]}, + ) + while True: + val_in = mor.get("VAL") + if val_in is not None: + current_val = val_in + mor.serve(VAL=current_val, RBV=current_val) + time.sleep(0.05) - # Mock to simulate a real EPICS moteur - with patch('epics.Motor.get') as mock_get: - mock_get.return_value = 'motor' - - # Mean of 10 runs - for _ in range(10): - t0 = time.perf_counter() - m_fast = Motor("TEST:MOTOR_t1") - t1 = time.perf_counter() - - t2 = time.perf_counter() - m_slow = epics.Motor("TEST:MOTOR_t2") - t3 = time.perf_counter() - - fast_times.append(t1 - t0) - slow_times.append(t3 - t2) - - avg_fast = sum(fast_times) / len(fast_times) - avg_slow = sum(slow_times) / len(slow_times) - - assert avg_fast < avg_slow, f"Fast version ({avg_fast:.6f}s) should be faster than EPICS ({avg_slow:.6f}s)" + t = threading.Thread(target=run_ioc, daemon=True) + t.start() + time.sleep(0.5) + yield -def test_motor_without_suffix_and_dot(): - # Suffix .VAL ... dot should be stripped from name - m1 = Motor("TEST:MOTOR_1.VAL") - assert m1._prefix == "TEST:MOTOR_1" - m2 = Motor("TEST:MOTOR_2.") - assert m2._prefix == "TEST:MOTOR_2" +def test_get_pv_connect_false_and_true(): + pv = get_pv("TEST:SIM:VAL", connect=False) + assert isinstance(pv, PV) + assert not pv.connected - m3 = Motor("TEST:MOTOR") - assert m3._prefix == "TEST:MOTOR" + pv2 = get_pv("TEST:SIM:VAL", connect=True, timeout=2.0) + assert pv2.connected + + +def test_motor_init_strips_suffixes(): + m1 = Motor("TEST:SIM:M1.VAL") + assert m1._prefix == "TEST:SIM:M1" + + m2 = Motor("TEST:SIM:M2.") + assert m2._prefix == "TEST:SIM:M2" + + m3 = Motor("TEST:SIM:M3") + assert m3._prefix == "TEST:SIM:M3" def test_motor_invalid_name_raises(): - # Should raise MotorException if name is none with pytest.raises(MotorException): Motor(None) -def test_motor_init_list_has_no_disabled(): - assert "disabled" not in Motor._init_list - assert "disabled" not in Motor._extras -def test_motor_initializes_device_attrs(): - # Check that Motor correctly initializes all Device PVs from _init_list - m = Motor("TEST:MOTOR") - - - print("_init_list contents:", m._init_list) - print("All attributes on motor:", dir(m)) - +def test_motor_init_list_and_extras(): + m = Motor("TEST:SIM:M4") + # disabled doit avoir été retiré + assert "disabled" not in m._init_list + assert "disabled" not in m._extras + + # Vérifier qu'ils ne sont pas vides + assert len(m._init_list) > 0 + assert len(m._extras) > 0 + + # Tous les PV de _init_list existent et sont des PV for attr in m._init_list: - print(f"\nChecking attribute: {attr}") - assert hasattr(m, attr), f"Motor is missing attribute: {attr}" pv = getattr(m, attr) - - print(f"PV object: {pv}") - print("Type:", type(pv)) - if pv is not None: - print(f"PV name: {pv.pvname}") - print(f"PV connected: {pv.connected}") - - assert isinstance(pv, epics.pv.PV), f"{attr} is not a valid PV object" - assert not pv.connected, f"{attr} should not be connected yet" - + assert isinstance(pv, PV) + assert not pv.connected -def test_motor_pv_connection_skipped_by_default(): - # PVs should not be connected right after Motor creation - m = Motor("TEST:MOTOR") - for key in m._extras: - pv = m.PV(key) - # Explicitly check that no connection was forced during init - assert not pv.connected, f"PV {key} was connected unexpectedly" - -def test_motor_adds_all_extras_pvs(): - # Given a test motor name - m = Motor("TEST:MOTOR") - - # For each key in _extras, the motor should have an attribute with that name + # Tous les PV de _extras existent et sont corrects for attr_name, suffix in m._extras.items(): - # The attribute must exist - assert hasattr(m, attr_name), f"Missing attribute: {attr_name}" - - # The attribute must be a PV pv = getattr(m, attr_name) - assert isinstance(pv, PV), f"{attr_name} is not a PV" + assert isinstance(pv, PV) + assert pv.pvname == f"TEST:SIM:M4{suffix}" - # The PV's name should match the expected pattern - expected_name = f"TEST:MOTOR{suffix}" - assert pv.pvname == expected_name, f"PV name mismatch for {attr_name}" - assert not pv.connected, f"{attr_name} PV should not be connected immediately" +def test_motor_pv_method_connects_and_reads(): + m = Motor("TEST:SIM:M5") + pv_val = m.PV("VAL", connect=True, timeout=1.0) + assert pv_val.connected + pv_val.put(123.0) + time.sleep(0.1) + assert pv_val.get() == 123.0 -def test_motor_PV_method_exists_and_works(): - # Motor.PV(): should call super().PV and return a PV object - m = Motor("TEST:MOTOR") - pv = m.PV("RBV", connect=False) - assert pv is not None - assert isinstance(pv, epics.pv.PV) + +def test_motor_reads_enum_status(): + m = Motor("TEST:SIM:M6") + pv_status = m.PV("STATUS", connect=True) + allowed = pv_status.enum_strs + assert allowed == ["IDLE", "MOVING", "DONE"] + + +def test_speedup_get_pv(): + """Test que get_pv optimisé est plus rapide que epics.get_pv.""" + t0 = time.perf_counter() + pv_fast = get_pv("TEST:SIM:M1.VAL", connect=False) + t1 = time.perf_counter() + + t2 = time.perf_counter() + pv_slow = epics.get_pv("TEST:SIM:M1.VAL", connect=False) + t3 = time.perf_counter() + + fast_time = t1 - t0 + slow_time = t3 - t2 + + assert fast_time < slow_time, ( + f"get_pv optimisé ({fast_time:.6f}s) plus lent que EPICS ({slow_time:.6f}s)" + ) + + +def test_speedup_motor_instantiation(): + """Test que Motor optimisé est plus rapide à instancier que epics.Motor.""" + t0 = time.perf_counter() + m_fast = Motor("TEST:SIM:M2") + t1 = time.perf_counter() + + t2 = time.perf_counter() + m_slow = epics.Motor("TEST:SIM:M3") + t3 = time.perf_counter() + + fast_time = t1 - t0 + slow_time = t3 - t2 + + assert fast_time < slow_time, ( + f"Motor optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor ({slow_time:.6f}s)" + ) + + +def test_speedup_motor_PV(): + """Test que PV optimisé est plus rapide que PV avec connect=True.""" + m_slow = Motor("TEST:SIM:M4") + t0 = time.perf_counter() + pv_slow = m_slow.PV("VAL", connect=True, timeout=1.0) + t1 = time.perf_counter() + + m_fast = Motor("TEST:SIM:M5") + t2 = time.perf_counter() + pv_fast = m_fast.PV("VAL", connect=False, timeout=1.0) + t3 = time.perf_counter() + + slow_time = t1 - t0 + fast_time = t3 - t2 + + assert pv_epics.connected + assert pv_fast.connected + assert fast_time < slow_time, ( + f"Motor.PV optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor.PV ({slow_time:.6f}s)" + ) + +def test_motor_init_list_attrs_created(): + m = Motor("TEST:SIM:M10") + for attr in m._init_list: + pv = getattr(m, attr) + assert isinstance(pv, epics.pv.PV), f"{attr} n'est pas un PV" + assert not pv.connected, f"{attr} devrait être non connecté au départ" + +def test_motor_extras_attrs_correct(): + m = Motor("TEST:SIM:M11") + for key, suffix in m._extras.items(): + assert hasattr(m, key), f"{key} manquant dans Motor" + pv = getattr(m, key) + assert isinstance(pv, epics.pv.PV) + assert pv.pvname == f"TEST:SIM:M11{suffix}" + +def test_motor_callbacks_empty(): + m = Motor("TEST:SIM:M12") + assert isinstance(m._callbacks, dict) + assert m._callbacks == {} \ No newline at end of file