diff --git a/tests/test_utils_hastyepics.py b/tests/test_utils_hastyepics.py index 9dd395b74..61af359d3 100644 --- a/tests/test_utils_hastyepics.py +++ b/tests/test_utils_hastyepics.py @@ -1,182 +1,366 @@ import time import threading import pytest -import epics -from epics.pv import PV -from slic.utils.hastyepics import * from morbidissimo import MorIOC -# IOC simulation -@pytest.fixture(scope="module", autouse=True) -def morioc_server(): - """Démarre un IOC simulé pour tous les tests.""" - def run_ioc(): - with MorIOC("TEST:SIM:") as mor: - current_val = 0.0 - mor.host( - VAL=float, - RBV=float, - STATUS={"type": "enum", "enums": ["IDLE", "MOVING", "DONE"]}, - disabled=int, # pour tester suppression - ) - while True: - val_in = mor.get("VAL") - if val_in is not None: - current_val = val_in - mor.serve( - VAL=current_val, - RBV=current_val, - STATUS=0, - disabled=0, - RTYP="motor" # pour que epics.Motor original accepte - ) - time.sleep(0.05) +from slic.utils.opmsg import * - t = threading.Thread(target=run_ioc, daemon=True) - t.start() + +import time +import threading +import pytest +from morbidissimo import MorIOC +from slic.utils.opmsg import IDS, BEAMLINES, N_MSG_HISTORY + +def ioc(): + """ + IOC de test pour: + - STATUS / STATUS-DATE + - OP-DATEi / OP-MSGi + - OP-MSG-tmp (exactement ce que ton code utilise) + - CATEGORY / DOWNTIME + """ + def run_op_prefix(prefix: str): + with MorIOC(prefix) as mor: + mor.host( + STATUS={"type": "enum", "enums": ["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]}, + **{"STATUS-DATE": str}, + **{f"OP-DATE{i}": str for i in range(N_MSG_HISTORY)}, + **{f"OP-MSG{i}": str for i in range(N_MSG_HISTORY)}, + **{"OP-MSG-TMP": str}, + ) + + # Seed une seule fois + mor.serve( + STATUS="OFFLINE", + **{"STATUS-DATE": "2024-01-01 00:00:00"}, + **{f"OP-DATE{i}": f"2024-01-01 00:00:0{i}" for i in range(N_MSG_HISTORY)}, + **{f"OP-MSG{i}": f"Initial message {i}" for i in range(N_MSG_HISTORY)}, + **{"OP-MSG-TMP": ""}, + ) + + while True: + mor.serve() + time.sleep(0.02) + + def run_status_prefix(beamline: str): + with MorIOC(f"SF-STATUS-{beamline}") as mor: + mor.host( + CATEGORY={"type": "enum", "enums": ["USER", "MD", "SD", "ACCESS", "DOWN"]}, + DOWNTIME=str, + ) + mor.serve(CATEGORY="USER", DOWNTIME="00:00:00") + while True: + mor.serve() + time.sleep(0.02) + + threads = [] + for ID in IDS.values(): + prefix = f"SF-OP:{ID}-MSG" + t = threading.Thread(target=run_op_prefix, args=(prefix,), daemon=True) + t.start() + threads.append(t) + + for bl in BEAMLINES: + t = threading.Thread(target=run_status_prefix, args=(bl,), daemon=True) + t.start() + threads.append(t) + + return threads + +@pytest.fixture(scope="module", autouse=True) +def run_all_iocs(): + threads = ioc() time.sleep(0.5) yield -def test_get_pv_connect_false_and_true(): - pv = get_pv("TEST:SIM:VAL", connect=False) - assert isinstance(pv, PV) - assert not pv.connected - - pv2 = get_pv("TEST:SIM:VAL", connect=True, timeout=2.0) - assert pv2.connected +@pytest.fixture(scope="module", autouse=True) +def run_all_iocs(): + threads = ioc() + # Short boot delay to ensure IOCs are listening + time.sleep(0.3) + yield + # Daemon threads exit with pytest -def test_motor_init_strips_suffixes(): - m1 = Motor("TEST:SIM:M1.VAL") - assert m1._prefix == "TEST:SIM:M1" - - m2 = Motor("TEST:SIM:M2.") - assert m2._prefix == "TEST:SIM:M2" - - m3 = Motor("TEST:SIM:M3") - assert m3._prefix == "TEST:SIM:M3" +# Autouse fixture to start IOC once per module +@pytest.fixture(scope="module", autouse=True) +def run_all_iocs(): + threads = ioc() + # Small boot delay so all IOCs are ready before the tests run + time.sleep(0.3) + yield + # Daemon threads end with pytest -def test_motor_invalid_name_raises(): - with pytest.raises(MotorException): - Motor(None) +# Tests: OperationMessageStatus +class TestOperationMessageStatus: + @pytest.fixture + def status(self): + s = OperationMessageStatus("SF-OP:CR-MSG") + assert s.pv_date.wait_for_connection(timeout=2.0), "PV STATUS-DATE not connected" + assert s.pv_status.wait_for_connection(timeout=2.0), "PV STATUS not connected" + return s + + def test_initialization_and_enum(self, status): + # Ensure PVs are created and connected, and allowed values match expectations + assert status.pv_date is not None + assert status.pv_status is not None + assert status.pv_date.connected + assert status.pv_status.connected + + allowed = status.get_allowed() + assert set(["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]).issubset(set(allowed)) + + def test_properties_date_status(self, status): + # Verify the .date and .status properties return correct values + status.pv_date.put("2025-08-08 10:00:00", wait=True) + status.pv_status.put("REMOTE", wait=True) + assert status.date == "2025-08-08 10:00:00" + assert status.status == "REMOTE" + + def test_repr_uses_properties(self, status): + # Check that __repr__ includes both date and status + status.pv_date.put("2025-08-08 10:01:02", wait=True) + status.pv_status.put("ATTENDED", wait=True) + r = repr(status) + assert "2025-08-08 10:01:02 ATTENDED" in r + + @pytest.mark.parametrize("method,expected", [ + ("set_offline", "OFFLINE"), + ("set_preparation", "PREPARATION"), + ("set_remote", "REMOTE"), + ("set_attended", "ATTENDED"), + ]) + def test_set_methods_change_status(self, status, method, expected): + getattr(status, method)() + assert status.status == expected + + def test_update_direct_valid(self, status): + status.update("REMOTE") + assert status.status == "REMOTE" + + def test_update_invalid_raises_valueerror(self, status): + with pytest.raises(ValueError) as exc: + status.update("TOTALLY_INVALID_STATUS") + assert "not from allowed values" in str(exc.value) -def test_motor_init_list_and_extras(): - m = Motor("TEST:SIM:M4") - assert "disabled" not in m._init_list - assert "disabled" not in m._extras - assert len(m._init_list) > 0 - assert len(m._extras) > 0 +# Tests: OperationMessageEntry +class TestOperationMessageEntry: + @pytest.fixture(params=range(N_MSG_HISTORY)) + def entry(self, request): + idx = request.param + e = OperationMessageEntry("SF-OP:CR-MSG", idx) + assert e.pv_date.wait_for_connection(timeout=2.0), f"PV OP-DATE{idx} not connected" + assert e.pv_msg.wait_for_connection(timeout=2.0), f"PV OP-MSG{idx} not connected" + return e - for attr in m._init_list: - pv = getattr(m, attr) - assert isinstance(pv, PV) - assert not pv.connected + def test_initialization(self, entry): + assert entry.pv_date is not None + assert entry.pv_msg is not None + assert entry.pv_date.connected + assert entry.pv_msg.connected - for attr_name, suffix in m._extras.items(): - pv = getattr(m, attr_name) - assert isinstance(pv, PV) - assert pv.pvname == f"TEST:SIM:M4{suffix}" + def test_date_property(self, entry): + test_val = "2025-08-08 12:34:56" + entry.pv_date.put(test_val, wait=True) + assert entry.date == test_val + + def test_msg_property(self, entry): + test_val = "Hello from test" + entry.pv_msg.put(test_val, wait=True) + assert entry.msg == test_val + + def test_repr(self, entry): + date_val = "2025-08-08 13:00:00" + msg_val = "System maintenance" + entry.pv_date.put(date_val, wait=True) + entry.pv_msg.put(msg_val, wait=True) + expected_repr = f"{date_val} {msg_val}" + assert repr(entry) == expected_repr -def test_motor_pv_method_connects_and_reads(): - m = Motor("TEST:SIM:M5") - pv_val = m.PV("VAL", connect=True, timeout=1.0) - assert pv_val.connected - pv_val.put(123.0) - time.sleep(0.1) - assert pv_val.get() == 123.0 +# Tests: OperationMessage +class TestOperationMessage: + def test_init_with_name(self): + om = OperationMessage(name="Control Room") + assert om.name == "Control Room" + assert om.ID == "CR" + assert om.prefix == "SF-OP:CR-MSG" + + assert om.pv_send.wait_for_connection(timeout=2.0) + assert om.status.pv_status.wait_for_connection(timeout=2.0) + assert len(om.entries) == N_MSG_HISTORY + for i, entry in enumerate(om.entries): + assert entry.pv_date.wait_for_connection(timeout=2.0), f"OP-DATE{i} not connected" + assert entry.pv_msg.wait_for_connection(timeout=2.0), f"OP-MSG{i} not connected" + + def test_init_with_id(self): + om = OperationMessage(ID="cr") # case-insensitive + assert om.ID == "CR" + assert om.name == IDS_INVERSE["CR"] # "Control Room" + assert om.prefix == "SF-OP:CR-MSG" + + def test_init_error_when_missing_both(self): + with pytest.raises(ValueError): + OperationMessage() + + def test_getitem_returns_entry(self): + om = OperationMessage(name="Control Room") + for i in range(N_MSG_HISTORY): + e = om[i] + assert isinstance(e, OperationMessageEntry) + + def test_update_puts_to_tmp_channel(self): + om = OperationMessage(name="Control Room") + assert om.pv_send.wait_for_connection(timeout=2.0) + msg = "Beam down at 14:00" + om.update(msg) + got = om.pv_send.get(as_string=True) + if isinstance(got, bytes): + got = got.decode("utf-8", errors="ignore") + assert got == msg + + def test_status_object_is_wired(self): + om = OperationMessage(name="Control Room") + assert isinstance(om.status, OperationMessageStatus) + om.status.update("REMOTE") + assert om.status.status == "REMOTE" + + def test_repr_contains_header_and_entries(self): + om = OperationMessage(name="Control Room") + om.status.pv_status.put("ATTENDED", wait=True) + om.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True) + om.entries[0].pv_msg.put("Slot A ok", wait=True) + r = repr(om) + assert "Control Room (CR)" in r + assert "ATTENDED" in r + assert "2025-08-08 12:00:00" in r + assert "Slot A ok" in r -def test_motor_reads_enum_status(): - m = Motor("TEST:SIM:M6") - pv_status = m.PV("STATUS", connect=True) - allowed = pv_status.enum_strs - assert allowed == ["IDLE", "MOVING", "DONE"] +# Tests: OperationMessages +class TestOperationMessages: + def test_initialization_populates_entries_and_items(self): + oms = OperationMessages() + + # One entry per human-readable name + assert set(oms.entries.keys()) == set(IDS.keys()) + + # Each value is a wired/connected OperationMessage + for name, om in oms.entries.items(): + assert isinstance(om, OperationMessage) + assert om.pv_send.wait_for_connection(timeout=2.0) + assert om.status.pv_status.wait_for_connection(timeout=2.0) + for i, entry in enumerate(om.entries): + assert isinstance(entry, OperationMessageEntry) + assert entry.pv_date.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-DATE{i} not connected" + assert entry.pv_msg.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-MSG{i} not connected" + + # Dynamic attributes exist (cleaned names → attributes) + for name in IDS.keys(): + attr = clean_name(name) + assert hasattr(oms, attr), f"Missing attribute: {attr}" + assert getattr(oms, attr) is oms.entries[name] + + def test_getitem_supports_cleaned_name_and_cleaned_id(self): + oms = OperationMessages() + + name = "Control Room" + ID = IDS[name] + cleaned_name = "control_room" + cleaned_id = "cr" + + om_by_name = oms[cleaned_name] + assert isinstance(om_by_name, OperationMessage) + assert om_by_name.ID == ID + + om_by_id = oms[cleaned_id] + assert isinstance(om_by_id, OperationMessage) + assert om_by_id is om_by_name + + om_by_original_name = oms["Control Room"] + assert om_by_original_name is om_by_name + + with pytest.raises(KeyError): + _ = oms["does_not_exist"] + + def test_iter_yields_operationmessage_instances(self): + oms = OperationMessages() + vals = list(iter(oms)) + assert len(vals) == len(IDS) + assert all(isinstance(v, OperationMessage) for v in vals) + + # All prefixes serve identical initial data; verify contents quickly + for om in vals: + assert om.status.status == "OFFLINE" + assert om.status.date == "2024-01-01 00:00:00" + for i, entry in enumerate(om.entries): + assert entry.date == f"2024-01-01 00:00:0{i}" + assert entry.msg == f"Initial message {i}" + + def test_key_completions_exposes_all_cleaned_keys(self): + oms = OperationMessages() + keys = set(oms._ipython_key_completions_()) + expected_names = {clean_name(n) for n in IDS.keys()} + expected_ids = {clean_name(ID) for ID in IDS.values()} + assert expected_names.issubset(keys) + assert expected_ids.issubset(keys) + + def test_repr_includes_each_submessage_repr(self): + oms = OperationMessages() + r = repr(oms) + assert "Control Room (CR)" in r # header from one submessage + + om_cr = oms["control_room"] + # Modify two consecutive entries + om_cr.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True) + om_cr.entries[0].pv_msg.put("Test message 1", wait=True) + om_cr.entries[1].pv_date.put("2025-08-08 12:05:00", wait=True) + om_cr.entries[1].pv_msg.put("Test message 2", wait=True) + + r2 = repr(oms) + assert "2025-08-08 12:00:00" in r2 + assert "Test message 1" in r2 + assert "2025-08-08 12:05:00" in r2 + assert "Test message 2" in r2 -def test_speedup_get_pv(): - """Test que get_pv optimisé est plus rapide que epics.get_pv.""" - N = 10 - t0 = time.perf_counter() - for _ in range(N): - get_pv("TEST:SIM:M1.VAL", connect=False) - t1 = time.perf_counter() +# Tests: MachineStatus +class TestMachineStatus: + @pytest.mark.parametrize("beamline", BEAMLINES) + def test_init_and_connections(self, beamline): + ms = MachineStatus(beamline) + assert ms.beamline == beamline.upper() + assert ms.pv_category.wait_for_connection(timeout=2.0), f"{beamline}: CATEGORY not connected" + assert ms.pv_downtime.wait_for_connection(timeout=2.0), f"{beamline}: DOWNTIME not connected" - t2 = time.perf_counter() - for _ in range(N): - epics.get_pv("TEST:SIM:M1.VAL", connect=False) - t3 = time.perf_counter() + def test_init_invalid_beamline_raises(self): + with pytest.raises(ValueError) as exc: + MachineStatus("NOT_A_BEAMLINE") + assert 'beamline "NOT_A_BEAMLINE" must be from:' in str(exc.value) - fast_time = t1 - t0 - slow_time = t3 - t2 - assert fast_time < slow_time, ( - f"get_pv optimisé ({fast_time:.6f}s) plus lent que EPICS ({slow_time:.6f}s)" - ) + @pytest.mark.parametrize("beamline", BEAMLINES) + def test_properties_category_and_downtime(self, beamline): + ms = MachineStatus(beamline) + ms.pv_category.put("MD", wait=True) + ms.pv_downtime.put("01:23:45", wait=True) + assert ms.category == "MD" + assert ms.downtime == "01:23:45" + ms.pv_category.put("SD", wait=True) + ms.pv_downtime.put("00:10:00", wait=True) + assert ms.category == "SD" + assert ms.downtime == "00:10:00" -def test_speedup_motor_instantiation(): - """Test que Motor optimisé est plus rapide à instancier que epics.Motor.""" - N = 10 - t0 = time.perf_counter() - for _ in range(N): - Motor("TEST:SIM:M2") - t1 = time.perf_counter() - - t2 = time.perf_counter() - for _ in range(N): - epics.Motor("TEST:SIM:M3") - t3 = time.perf_counter() - - fast_time = t1 - t0 - slow_time = t3 - t2 - assert fast_time < slow_time, ( - f"Motor optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor ({slow_time:.6f}s)" - ) - - -def test_speedup_motor_PV(): - """Test que Motor.PV optimisé est plus rapide avec connect=False.""" - m_epics = epics.Motor("TEST:SIM:M4") - t0 = time.perf_counter() - pv_epics = m_epics.PV("VAL", connect=True, timeout=1.0) - t1 = time.perf_counter() - - m_fast = Motor("TEST:SIM:M5") - t2 = time.perf_counter() - pv_fast = m_fast.PV("VAL", connect=False, timeout=1.0) - t3 = time.perf_counter() - - assert pv_epics.connected - assert pv_fast.connected - - slow_time = t1 - t0 - fast_time = t3 - t2 - assert fast_time < slow_time, ( - f"Motor.PV optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor.PV ({slow_time:.6f}s)" - ) - - -def test_motor_init_list_attrs_created(): - m = Motor("TEST:SIM:M10") - for attr in m._init_list: - pv = getattr(m, attr) - assert isinstance(pv, PV), f"{attr} n'est pas un PV" - assert not pv.connected, f"{attr} devrait être non connecté au départ" - - -def test_motor_extras_attrs_correct(): - m = Motor("TEST:SIM:M11") - for key, suffix in m._extras.items(): - assert hasattr(m, key), f"{key} manquant dans Motor" - pv = getattr(m, key) - assert isinstance(pv, PV) - assert pv.pvname == f"TEST:SIM:M11{suffix}" - - -def test_motor_callbacks_empty(): - m = Motor("TEST:SIM:M12") - assert isinstance(m._callbacks, dict) - assert m._callbacks == {} + @pytest.mark.parametrize("beamline", BEAMLINES) + def test_repr_contains_header_and_values(self, beamline): + ms = MachineStatus(beamline) + ms.pv_category.put("ACCESS", wait=True) + ms.pv_downtime.put("00:05:00", wait=True) + r = repr(ms) + assert beamline in r + assert "ACCESS" in r + assert "00:05:00" in r