Update tests/test_utils_opmsg.py
Run CI Tests / test (push) Successful in 1m15s

This commit is contained in:
2025-08-08 14:49:56 +02:00
parent 206adb1c5e
commit 780f2f2add
+64 -135
View File
@@ -1,49 +1,36 @@
import time
import threading
import pytest
from epics import PV
from morbidissimo import MorIOC
from slic.utils.opmsg import *
# -----------------------------
# IOC: start all prefixes used by the SUT
# -----------------------------
import time
import threading
import pytest
from morbidissimo import MorIOC
from slic.utils.opmsg import IDS, BEAMLINES, N_MSG_HISTORY
def ioc():
"""
Start MorIOC servers for ALL PVs used by:
- OperationMessageStatus (STATUS, STATUS-DATE)
- OperationMessageEntry (OP-DATEi, OP-MSGi)
- OperationMessage (OP-MSG-tmp)
- MachineStatus (CATEGORY, DOWNTIME)
One MorIOC per prefix:
- For each ID in IDS: 'SF-OP:{ID}-MSG'
- For each beamline in BEAMLINES:'SF-STATUS-{beamline}'
Important decisions:
- Enums are declared with pcaspy-style dicts so enum_strs are present.
- OP-MSG-tmp is hosted EXACTLY as the production code expects (lowercase),
and aliased to OP-MSG-TMP to be robust.
- We seed values ONCE; inside the loop we only call mor.serve() to
process CA traffic, without clobbering test writes.
IOC de test pour:
- STATUS / STATUS-DATE
- OP-DATEi / OP-MSGi
- OP-MSG-tmp (exactement ce que ton code utilise)
- CATEGORY / DOWNTIME
"""
from morbidissimo import MorIOC # imported here to avoid test env surprises
def run_op_prefix(prefix: str):
with MorIOC(prefix) as mor:
pvdb = {
# true enum (pcaspy dict) so enum_strs is populated
"STATUS": {"type": "enum", "enums": ["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]},
"STATUS-DATE": str,
mor.host(
STATUS={"type": "enum", "enums": ["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]},
**{"STATUS-DATE": str},
**{f"OP-DATE{i}": str for i in range(N_MSG_HISTORY)},
**{f"OP-MSG{i}": str for i in range(N_MSG_HISTORY)},
**{"OP-MSG-TMP": ""},
}
mor.host(**pvdb)
**{"OP-MSG-TMP": str},
)
# Seed ONCE
# Seed une seule fois
mor.serve(
STATUS="OFFLINE",
**{"STATUS-DATE": "2024-01-01 00:00:00"},
@@ -53,7 +40,6 @@ def ioc():
)
while True:
# process CA traffic without overwriting user/test writes
mor.serve()
time.sleep(0.02)
@@ -63,15 +49,12 @@ def ioc():
CATEGORY={"type": "enum", "enums": ["USER", "MD", "SD", "ACCESS", "DOWN"]},
DOWNTIME=str,
)
# Seed ONCE
mor.serve(CATEGORY="USER", DOWNTIME="00:00:00")
while True:
mor.serve()
time.sleep(0.02)
threads = []
for ID in IDS.values():
prefix = f"SF-OP:{ID}-MSG"
t = threading.Thread(target=run_op_prefix, args=(prefix,), daemon=True)
@@ -85,81 +68,62 @@ def ioc():
return threads
@pytest.fixture(scope="module", autouse=True)
def run_all_iocs():
threads = ioc()
time.sleep(0.5)
yield
@pytest.fixture(scope="module", autouse=True)
def run_all_iocs():
threads = ioc()
# Slightly longer boot time to ensure all servers are listening
time.sleep(1.0)
# Short boot delay to ensure IOCs are listening
time.sleep(0.3)
yield
# Daemon threads exit with pytest
# -----------------------------
# Helpers for tests
# -----------------------------
def _ensure_enum_metadata(pv: PV):
"""
Ensure pyepics has fetched enum metadata; if not, force a read once.
"""
allowed = getattr(pv, "enum_strs", None)
if not allowed:
_ = pv.get(as_string=True)
allowed = getattr(pv, "enum_strs", None)
return allowed
# Autouse fixture to start IOC once per module
@pytest.fixture(scope="module", autouse=True)
def run_all_iocs():
threads = ioc()
# Small boot delay so all IOCs are ready before the tests run
time.sleep(0.3)
yield
# Daemon threads end with pytest
def _reset_operation_message_to_defaults(om: OperationMessage):
"""
Reset a full OperationMessage object to the default IOC seeded values.
"""
om.status.pv_status.put("OFFLINE", wait=True)
om.status.pv_date.put("2024-01-01 00:00:00", wait=True)
for i, e in enumerate(om.entries):
e.pv_date.put(f"2024-01-01 00:00:0{i}", wait=True)
e.pv_msg.put(f"Initial message {i}", wait=True)
om.pv_send.put("", wait=True)
# -----------------------------
# Tests: OperationMessageStatus
# -----------------------------
class TestOperationMessageStatus:
@pytest.fixture
def status(self):
s = OperationMessageStatus("SF-OP:CR-MSG")
assert s.pv_date.wait_for_connection(timeout=2.0), "PV STATUS-DATE not connected"
assert s.pv_status.wait_for_connection(timeout=2.0), "PV STATUS not connected"
# ensure enum metadata present
_ensure_enum_metadata(s.pv_status)
return s
def test_initialization_and_enum(self, status):
# Ensure PVs are created and connected, and allowed values match expectations
assert status.pv_date is not None
assert status.pv_status is not None
assert status.pv_date.connected
assert status.pv_status.connected
allowed = status.get_allowed()
if not allowed:
_ = status.pv_status.get(as_string=True)
allowed = status.get_allowed()
assert set(["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]).issubset(set(allowed))
def test_properties_date_status(self, status):
# Verify the .date and .status properties return correct values
status.pv_date.put("2025-08-08 10:00:00", wait=True)
status.pv_status.put("REMOTE", wait=True)
assert status.date == "2025-08-08 10:00:00"
assert status.status == "REMOTE"
def test_repr_uses_properties(self, status):
# Check that __repr__ includes both date and status
status.pv_date.put("2025-08-08 10:01:02", wait=True)
status.pv_status.put("ATTENDED", wait=True)
r = repr(status)
assert "2025-08-08 10:01:02 ATTENDED" in r
@@ -170,28 +134,20 @@ class TestOperationMessageStatus:
("set_attended", "ATTENDED"),
])
def test_set_methods_change_status(self, status, method, expected):
# Make sure enum metadata is ready
_ensure_enum_metadata(status.pv_status)
getattr(status, method)()
assert status.status == expected
def test_update_direct_valid(self, status):
_ensure_enum_metadata(status.pv_status)
status.update("REMOTE")
assert status.status == "REMOTE"
def test_update_invalid_raises_valueerror(self, status):
_ensure_enum_metadata(status.pv_status)
with pytest.raises(ValueError) as exc:
status.update("TOTALLY_INVALID_STATUS")
assert "not from allowed values" in str(exc.value)
# -----------------------------
# Tests: OperationMessageEntry
# -----------------------------
class TestOperationMessageEntry:
@pytest.fixture(params=range(N_MSG_HISTORY))
def entry(self, request):
@@ -222,30 +178,29 @@ class TestOperationMessageEntry:
msg_val = "System maintenance"
entry.pv_date.put(date_val, wait=True)
entry.pv_msg.put(msg_val, wait=True)
expected_repr = f"{date_val} {msg_val}"
assert repr(entry) == expected_repr
# -----------------------------
# Tests: OperationMessage
# -----------------------------
class TestOperationMessage:
def test_init_with_name(self):
om = OperationMessage(name="Control Room")
# connections
assert om.name == "Control Room"
assert om.ID == "CR"
assert om.prefix == "SF-OP:CR-MSG"
assert om.pv_send.wait_for_connection(timeout=2.0)
assert om.status.pv_status.wait_for_connection(timeout=2.0)
assert len(om.entries) == N_MSG_HISTORY
for i, entry in enumerate(om.entries):
assert entry.pv_date.wait_for_connection(timeout=2.0), f"OP-DATE{i} not connected"
assert entry.pv_msg.wait_for_connection(timeout=2.0), f"OP-MSG{i} not connected"
assert len(om.entries) == N_MSG_HISTORY
def test_init_with_id(self):
om = OperationMessage(ID="cr")
om = OperationMessage(ID="cr") # case-insensitive
assert om.ID == "CR"
assert om.name is not None # from IDS_INVERSE
assert om.name == IDS_INVERSE["CR"] # "Control Room"
assert om.prefix == "SF-OP:CR-MSG"
def test_init_error_when_missing_both(self):
@@ -254,16 +209,15 @@ class TestOperationMessage:
def test_getitem_returns_entry(self):
om = OperationMessage(name="Control Room")
for idx in range(N_MSG_HISTORY):
assert isinstance(om[idx], OperationMessageEntry)
for i in range(N_MSG_HISTORY):
e = om[i]
assert isinstance(e, OperationMessageEntry)
def test_update_puts_to_tmp_channel(self):
om = OperationMessage(name="Control Room")
assert om.pv_send.wait_for_connection(timeout=2.0)
msg = "Beam down at 14:00"
om.update(msg)
got = om.pv_send.get(as_string=True)
if isinstance(got, bytes):
got = got.decode("utf-8", errors="ignore")
@@ -271,18 +225,15 @@ class TestOperationMessage:
def test_status_object_is_wired(self):
om = OperationMessage(name="Control Room")
_ensure_enum_metadata(om.status.pv_status)
assert isinstance(om.status, OperationMessageStatus)
om.status.update("REMOTE")
assert om.status.status == "REMOTE"
def test_repr_contains_header_and_entries(self):
om = OperationMessage(name="Control Room")
_reset_operation_message_to_defaults(om)
om.status.pv_status.put("ATTENDED", wait=True)
om.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True)
om.entries[0].pv_msg.put("Slot A ok", wait=True)
r = repr(om)
assert "Control Room (CR)" in r
assert "ATTENDED" in r
@@ -290,15 +241,15 @@ class TestOperationMessage:
assert "Slot A ok" in r
# -----------------------------
# Tests: OperationMessages
# -----------------------------
# Tests: OperationMessages
class TestOperationMessages:
def test_initialization_populates_entries_and_items(self):
oms = OperationMessages()
# One entry per human-readable name
assert set(oms.entries.keys()) == set(IDS.keys())
# Each value is a wired/connected OperationMessage
for name, om in oms.entries.items():
assert isinstance(om, OperationMessage)
assert om.pv_send.wait_for_connection(timeout=2.0)
@@ -308,8 +259,9 @@ class TestOperationMessages:
assert entry.pv_date.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-DATE{i} not connected"
assert entry.pv_msg.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-MSG{i} not connected"
# Dynamic attributes exist (cleaned names → attributes)
for name in IDS.keys():
attr = name.lower().replace(" ", "_")
attr = clean_name(name)
assert hasattr(oms, attr), f"Missing attribute: {attr}"
assert getattr(oms, attr) is oms.entries[name]
@@ -318,7 +270,7 @@ class TestOperationMessages:
name = "Control Room"
ID = IDS[name]
cleaned_name = "control_room"
cleaned_name = "control_room"
cleaned_id = "cr"
om_by_name = oms[cleaned_name]
@@ -329,7 +281,7 @@ class TestOperationMessages:
assert isinstance(om_by_id, OperationMessage)
assert om_by_id is om_by_name
om_by_original_name = oms["Control Room"]
om_by_original_name = oms["Control Room"]
assert om_by_original_name is om_by_name
with pytest.raises(KeyError):
@@ -337,16 +289,11 @@ class TestOperationMessages:
def test_iter_yields_operationmessage_instances(self):
oms = OperationMessages()
# Reset everything to defaults to avoid cross-test leakage
for om in oms:
_reset_operation_message_to_defaults(om)
vals = list(iter(oms))
assert len(vals) == len(IDS)
assert all(isinstance(v, OperationMessage) for v in vals)
# Check default contents
# All prefixes serve identical initial data; verify contents quickly
for om in vals:
assert om.status.status == "OFFLINE"
assert om.status.date == "2024-01-01 00:00:00"
@@ -357,61 +304,47 @@ class TestOperationMessages:
def test_key_completions_exposes_all_cleaned_keys(self):
oms = OperationMessages()
keys = set(oms._ipython_key_completions_())
expected_names = {n.lower().replace(" ", "_") for n in IDS.keys()}
expected_ids = {id_.lower() for id_ in IDS.values()}
expected_names = {clean_name(n) for n in IDS.keys()}
expected_ids = {clean_name(ID) for ID in IDS.values()}
assert expected_names.issubset(keys)
assert expected_ids.issubset(keys)
def test_repr_includes_each_submessage_repr(self):
oms = OperationMessages()
r = repr(oms)
assert "Control Room (CR)" in r # header from one submessage
om_cr = oms["control_room"]
# ensure baseline then set two lines
_reset_operation_message_to_defaults(om_cr)
# Modify two consecutive entries
om_cr.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True)
om_cr.entries[0].pv_msg.put("Test message 1", wait=True)
om_cr.entries[1].pv_date.put("2025-08-08 12:05:00", wait=True)
om_cr.entries[1].pv_msg.put("Test message 2", wait=True)
r2 = repr(oms)
assert "Control Room (CR)" in r2
assert "2025-08-08 12:00:00" in r2
assert "Test message 1" in r2
assert "2025-08-08 12:05:00" in r2
assert "Test message 2" in r2
# -----------------------------
# Tests: MachineStatus
# -----------------------------
class TestMachineStatus:
@pytest.mark.parametrize("beamline", BEAMLINES)
def test_init_and_connections(self, beamline):
from slic.utils.opmsg import MachineStatus
ms = MachineStatus(beamline)
assert ms.beamline == beamline.upper()
assert ms.pv_category.wait_for_connection(timeout=2.0), f"{beamline}: CATEGORY not connected"
assert ms.pv_downtime.wait_for_connection(timeout=2.0), f"{beamline}: DOWNTIME not connected"
def test_init_invalid_beamline_raises(self):
from slic.utils.opmsg import MachineStatus
with pytest.raises(ValueError) as exc:
MachineStatus("NOT_A_BEAMLINE")
assert 'must be from:' in str(exc.value)
assert 'beamline "NOT_A_BEAMLINE" must be from:' in str(exc.value)
@pytest.mark.parametrize("beamline", BEAMLINES)
def test_properties_category_and_downtime(self, beamline):
from slic.utils.opmsg import MachineStatus
ms = MachineStatus(beamline)
# ensure enum metadata
_ensure_enum_metadata(ms.pv_category)
ms.pv_category.put("MD", wait=True)
ms.pv_downtime.put("01:23:45", wait=True)
assert ms.category == "MD"
@@ -424,13 +357,9 @@ class TestMachineStatus:
@pytest.mark.parametrize("beamline", BEAMLINES)
def test_repr_contains_header_and_values(self, beamline):
from slic.utils.opmsg import MachineStatus
ms = MachineStatus(beamline)
_ensure_enum_metadata(ms.pv_category)
ms.pv_category.put("ACCESS", wait=True)
ms.pv_downtime.put("00:05:00", wait=True)
r = repr(ms)
assert beamline in r
assert "ACCESS" in r