diff --git a/tests/test_utils_opmsg.py b/tests/test_utils_opmsg.py index a0c529c5f..61af359d3 100644 --- a/tests/test_utils_opmsg.py +++ b/tests/test_utils_opmsg.py @@ -1,49 +1,36 @@ import time import threading import pytest - -from epics import PV +from morbidissimo import MorIOC from slic.utils.opmsg import * -# ----------------------------- -# IOC: start all prefixes used by the SUT -# ----------------------------- + +import time +import threading +import pytest +from morbidissimo import MorIOC +from slic.utils.opmsg import IDS, BEAMLINES, N_MSG_HISTORY def ioc(): """ - Start MorIOC servers for ALL PVs used by: - - OperationMessageStatus (STATUS, STATUS-DATE) - - OperationMessageEntry (OP-DATEi, OP-MSGi) - - OperationMessage (OP-MSG-tmp) - - MachineStatus (CATEGORY, DOWNTIME) - - One MorIOC per prefix: - - For each ID in IDS: 'SF-OP:{ID}-MSG' - - For each beamline in BEAMLINES:'SF-STATUS-{beamline}' - - Important decisions: - - Enums are declared with pcaspy-style dicts so enum_strs are present. - - OP-MSG-tmp is hosted EXACTLY as the production code expects (lowercase), - and aliased to OP-MSG-TMP to be robust. - - We seed values ONCE; inside the loop we only call mor.serve() to - process CA traffic, without clobbering test writes. + IOC de test pour: + - STATUS / STATUS-DATE + - OP-DATEi / OP-MSGi + - OP-MSG-tmp (exactement ce que ton code utilise) + - CATEGORY / DOWNTIME """ - from morbidissimo import MorIOC # imported here to avoid test env surprises - def run_op_prefix(prefix: str): with MorIOC(prefix) as mor: - pvdb = { - # true enum (pcaspy dict) so enum_strs is populated - "STATUS": {"type": "enum", "enums": ["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]}, - "STATUS-DATE": str, + mor.host( + STATUS={"type": "enum", "enums": ["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]}, + **{"STATUS-DATE": str}, **{f"OP-DATE{i}": str for i in range(N_MSG_HISTORY)}, **{f"OP-MSG{i}": str for i in range(N_MSG_HISTORY)}, - **{"OP-MSG-TMP": ""}, - } - mor.host(**pvdb) + **{"OP-MSG-TMP": str}, + ) - # Seed ONCE + # Seed une seule fois mor.serve( STATUS="OFFLINE", **{"STATUS-DATE": "2024-01-01 00:00:00"}, @@ -53,7 +40,6 @@ def ioc(): ) while True: - # process CA traffic without overwriting user/test writes mor.serve() time.sleep(0.02) @@ -63,15 +49,12 @@ def ioc(): CATEGORY={"type": "enum", "enums": ["USER", "MD", "SD", "ACCESS", "DOWN"]}, DOWNTIME=str, ) - # Seed ONCE mor.serve(CATEGORY="USER", DOWNTIME="00:00:00") - while True: mor.serve() time.sleep(0.02) threads = [] - for ID in IDS.values(): prefix = f"SF-OP:{ID}-MSG" t = threading.Thread(target=run_op_prefix, args=(prefix,), daemon=True) @@ -85,81 +68,62 @@ def ioc(): return threads +@pytest.fixture(scope="module", autouse=True) +def run_all_iocs(): + threads = ioc() + time.sleep(0.5) + yield + @pytest.fixture(scope="module", autouse=True) def run_all_iocs(): threads = ioc() - # Slightly longer boot time to ensure all servers are listening - time.sleep(1.0) + # Short boot delay to ensure IOCs are listening + time.sleep(0.3) yield # Daemon threads exit with pytest -# ----------------------------- -# Helpers for tests -# ----------------------------- - -def _ensure_enum_metadata(pv: PV): - """ - Ensure pyepics has fetched enum metadata; if not, force a read once. - """ - allowed = getattr(pv, "enum_strs", None) - if not allowed: - _ = pv.get(as_string=True) - allowed = getattr(pv, "enum_strs", None) - return allowed +# Autouse fixture to start IOC once per module +@pytest.fixture(scope="module", autouse=True) +def run_all_iocs(): + threads = ioc() + # Small boot delay so all IOCs are ready before the tests run + time.sleep(0.3) + yield + # Daemon threads end with pytest -def _reset_operation_message_to_defaults(om: OperationMessage): - """ - Reset a full OperationMessage object to the default IOC seeded values. - """ - om.status.pv_status.put("OFFLINE", wait=True) - om.status.pv_date.put("2024-01-01 00:00:00", wait=True) - for i, e in enumerate(om.entries): - e.pv_date.put(f"2024-01-01 00:00:0{i}", wait=True) - e.pv_msg.put(f"Initial message {i}", wait=True) - om.pv_send.put("", wait=True) - - -# ----------------------------- # Tests: OperationMessageStatus -# ----------------------------- - class TestOperationMessageStatus: @pytest.fixture def status(self): s = OperationMessageStatus("SF-OP:CR-MSG") assert s.pv_date.wait_for_connection(timeout=2.0), "PV STATUS-DATE not connected" assert s.pv_status.wait_for_connection(timeout=2.0), "PV STATUS not connected" - # ensure enum metadata present - _ensure_enum_metadata(s.pv_status) return s def test_initialization_and_enum(self, status): + # Ensure PVs are created and connected, and allowed values match expectations assert status.pv_date is not None assert status.pv_status is not None assert status.pv_date.connected assert status.pv_status.connected allowed = status.get_allowed() - if not allowed: - _ = status.pv_status.get(as_string=True) - allowed = status.get_allowed() - assert set(["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]).issubset(set(allowed)) def test_properties_date_status(self, status): + # Verify the .date and .status properties return correct values status.pv_date.put("2025-08-08 10:00:00", wait=True) status.pv_status.put("REMOTE", wait=True) - assert status.date == "2025-08-08 10:00:00" assert status.status == "REMOTE" def test_repr_uses_properties(self, status): + # Check that __repr__ includes both date and status status.pv_date.put("2025-08-08 10:01:02", wait=True) status.pv_status.put("ATTENDED", wait=True) - r = repr(status) assert "2025-08-08 10:01:02 ATTENDED" in r @@ -170,28 +134,20 @@ class TestOperationMessageStatus: ("set_attended", "ATTENDED"), ]) def test_set_methods_change_status(self, status, method, expected): - # Make sure enum metadata is ready - _ensure_enum_metadata(status.pv_status) - getattr(status, method)() assert status.status == expected def test_update_direct_valid(self, status): - _ensure_enum_metadata(status.pv_status) status.update("REMOTE") assert status.status == "REMOTE" def test_update_invalid_raises_valueerror(self, status): - _ensure_enum_metadata(status.pv_status) with pytest.raises(ValueError) as exc: status.update("TOTALLY_INVALID_STATUS") assert "not from allowed values" in str(exc.value) -# ----------------------------- # Tests: OperationMessageEntry -# ----------------------------- - class TestOperationMessageEntry: @pytest.fixture(params=range(N_MSG_HISTORY)) def entry(self, request): @@ -222,30 +178,29 @@ class TestOperationMessageEntry: msg_val = "System maintenance" entry.pv_date.put(date_val, wait=True) entry.pv_msg.put(msg_val, wait=True) - expected_repr = f"{date_val} {msg_val}" assert repr(entry) == expected_repr -# ----------------------------- # Tests: OperationMessage -# ----------------------------- - class TestOperationMessage: def test_init_with_name(self): om = OperationMessage(name="Control Room") - # connections + assert om.name == "Control Room" + assert om.ID == "CR" + assert om.prefix == "SF-OP:CR-MSG" + assert om.pv_send.wait_for_connection(timeout=2.0) assert om.status.pv_status.wait_for_connection(timeout=2.0) + assert len(om.entries) == N_MSG_HISTORY for i, entry in enumerate(om.entries): assert entry.pv_date.wait_for_connection(timeout=2.0), f"OP-DATE{i} not connected" assert entry.pv_msg.wait_for_connection(timeout=2.0), f"OP-MSG{i} not connected" - assert len(om.entries) == N_MSG_HISTORY def test_init_with_id(self): - om = OperationMessage(ID="cr") + om = OperationMessage(ID="cr") # case-insensitive assert om.ID == "CR" - assert om.name is not None # from IDS_INVERSE + assert om.name == IDS_INVERSE["CR"] # "Control Room" assert om.prefix == "SF-OP:CR-MSG" def test_init_error_when_missing_both(self): @@ -254,16 +209,15 @@ class TestOperationMessage: def test_getitem_returns_entry(self): om = OperationMessage(name="Control Room") - for idx in range(N_MSG_HISTORY): - assert isinstance(om[idx], OperationMessageEntry) + for i in range(N_MSG_HISTORY): + e = om[i] + assert isinstance(e, OperationMessageEntry) def test_update_puts_to_tmp_channel(self): om = OperationMessage(name="Control Room") assert om.pv_send.wait_for_connection(timeout=2.0) - msg = "Beam down at 14:00" om.update(msg) - got = om.pv_send.get(as_string=True) if isinstance(got, bytes): got = got.decode("utf-8", errors="ignore") @@ -271,18 +225,15 @@ class TestOperationMessage: def test_status_object_is_wired(self): om = OperationMessage(name="Control Room") - _ensure_enum_metadata(om.status.pv_status) + assert isinstance(om.status, OperationMessageStatus) om.status.update("REMOTE") assert om.status.status == "REMOTE" def test_repr_contains_header_and_entries(self): om = OperationMessage(name="Control Room") - _reset_operation_message_to_defaults(om) - om.status.pv_status.put("ATTENDED", wait=True) om.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True) om.entries[0].pv_msg.put("Slot A ok", wait=True) - r = repr(om) assert "Control Room (CR)" in r assert "ATTENDED" in r @@ -290,15 +241,15 @@ class TestOperationMessage: assert "Slot A ok" in r -# ----------------------------- -# Tests: OperationMessages -# ----------------------------- - +# Tests: OperationMessages class TestOperationMessages: def test_initialization_populates_entries_and_items(self): oms = OperationMessages() + + # One entry per human-readable name assert set(oms.entries.keys()) == set(IDS.keys()) + # Each value is a wired/connected OperationMessage for name, om in oms.entries.items(): assert isinstance(om, OperationMessage) assert om.pv_send.wait_for_connection(timeout=2.0) @@ -308,8 +259,9 @@ class TestOperationMessages: assert entry.pv_date.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-DATE{i} not connected" assert entry.pv_msg.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-MSG{i} not connected" + # Dynamic attributes exist (cleaned names → attributes) for name in IDS.keys(): - attr = name.lower().replace(" ", "_") + attr = clean_name(name) assert hasattr(oms, attr), f"Missing attribute: {attr}" assert getattr(oms, attr) is oms.entries[name] @@ -318,7 +270,7 @@ class TestOperationMessages: name = "Control Room" ID = IDS[name] - cleaned_name = "control_room" + cleaned_name = "control_room" cleaned_id = "cr" om_by_name = oms[cleaned_name] @@ -329,7 +281,7 @@ class TestOperationMessages: assert isinstance(om_by_id, OperationMessage) assert om_by_id is om_by_name - om_by_original_name = oms["Control Room"] + om_by_original_name = oms["Control Room"] assert om_by_original_name is om_by_name with pytest.raises(KeyError): @@ -337,16 +289,11 @@ class TestOperationMessages: def test_iter_yields_operationmessage_instances(self): oms = OperationMessages() - - # Reset everything to defaults to avoid cross-test leakage - for om in oms: - _reset_operation_message_to_defaults(om) - vals = list(iter(oms)) assert len(vals) == len(IDS) assert all(isinstance(v, OperationMessage) for v in vals) - # Check default contents + # All prefixes serve identical initial data; verify contents quickly for om in vals: assert om.status.status == "OFFLINE" assert om.status.date == "2024-01-01 00:00:00" @@ -357,61 +304,47 @@ class TestOperationMessages: def test_key_completions_exposes_all_cleaned_keys(self): oms = OperationMessages() keys = set(oms._ipython_key_completions_()) - - expected_names = {n.lower().replace(" ", "_") for n in IDS.keys()} - expected_ids = {id_.lower() for id_ in IDS.values()} - + expected_names = {clean_name(n) for n in IDS.keys()} + expected_ids = {clean_name(ID) for ID in IDS.values()} assert expected_names.issubset(keys) assert expected_ids.issubset(keys) def test_repr_includes_each_submessage_repr(self): oms = OperationMessages() + r = repr(oms) + assert "Control Room (CR)" in r # header from one submessage + om_cr = oms["control_room"] - - # ensure baseline then set two lines - _reset_operation_message_to_defaults(om_cr) - + # Modify two consecutive entries om_cr.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True) om_cr.entries[0].pv_msg.put("Test message 1", wait=True) - om_cr.entries[1].pv_date.put("2025-08-08 12:05:00", wait=True) om_cr.entries[1].pv_msg.put("Test message 2", wait=True) r2 = repr(oms) - assert "Control Room (CR)" in r2 assert "2025-08-08 12:00:00" in r2 assert "Test message 1" in r2 assert "2025-08-08 12:05:00" in r2 assert "Test message 2" in r2 -# ----------------------------- # Tests: MachineStatus -# ----------------------------- - class TestMachineStatus: @pytest.mark.parametrize("beamline", BEAMLINES) def test_init_and_connections(self, beamline): - from slic.utils.opmsg import MachineStatus ms = MachineStatus(beamline) assert ms.beamline == beamline.upper() assert ms.pv_category.wait_for_connection(timeout=2.0), f"{beamline}: CATEGORY not connected" assert ms.pv_downtime.wait_for_connection(timeout=2.0), f"{beamline}: DOWNTIME not connected" def test_init_invalid_beamline_raises(self): - from slic.utils.opmsg import MachineStatus with pytest.raises(ValueError) as exc: MachineStatus("NOT_A_BEAMLINE") - assert 'must be from:' in str(exc.value) + assert 'beamline "NOT_A_BEAMLINE" must be from:' in str(exc.value) @pytest.mark.parametrize("beamline", BEAMLINES) def test_properties_category_and_downtime(self, beamline): - from slic.utils.opmsg import MachineStatus ms = MachineStatus(beamline) - - # ensure enum metadata - _ensure_enum_metadata(ms.pv_category) - ms.pv_category.put("MD", wait=True) ms.pv_downtime.put("01:23:45", wait=True) assert ms.category == "MD" @@ -424,13 +357,9 @@ class TestMachineStatus: @pytest.mark.parametrize("beamline", BEAMLINES) def test_repr_contains_header_and_values(self, beamline): - from slic.utils.opmsg import MachineStatus ms = MachineStatus(beamline) - - _ensure_enum_metadata(ms.pv_category) ms.pv_category.put("ACCESS", wait=True) ms.pv_downtime.put("00:05:00", wait=True) - r = repr(ms) assert beamline in r assert "ACCESS" in r