365 lines
13 KiB
Python
365 lines
13 KiB
Python
import patch_put
|
|
import time
|
|
import threading
|
|
import pytest
|
|
from morbidissimo import MorIOC
|
|
import os
|
|
|
|
'''
|
|
os.environ.setdefault("EPICS_CA_ADDR_LIST", "127.0.0.1")
|
|
os.environ.setdefault("EPICS_CA_AUTO_ADDR_LIST", "NO")
|
|
|
|
import sys
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
from slic.utils.opmsg import *
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def run_all_iocs():
|
|
# Run a simulated IOC for all PVs used in these tests
|
|
stop = threading.Event()
|
|
|
|
def run():
|
|
with MorIOC("") as mor: # Single IOC → no port clashes
|
|
# OPMSG PVs for all IDs
|
|
for ID in IDS.values():
|
|
base = f"SF-OP:{ID}-MSG"
|
|
mor.host(
|
|
**{f"{base}:STATUS": {"type": "enum", "enums": ["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]}},
|
|
**{f"{base}:STATUS-DATE": str},
|
|
**{f"{base}:OP-MSG-TMP": str},
|
|
**{f"{base}:OP-DATE{i}": str for i in range(N_MSG_HISTORY)},
|
|
**{f"{base}:OP-MSG{i}": str for i in range(N_MSG_HISTORY)},
|
|
)
|
|
mor.serve(
|
|
**{f"{base}:STATUS": "OFFLINE"},
|
|
**{f"{base}:STATUS-DATE": "2024-01-01 00:00:00"},
|
|
**{f"{base}:OP-MSG-TMP": ""},
|
|
**{f"{base}:OP-DATE{i}": f"2024-01-01 00:00:0{i}" for i in range(N_MSG_HISTORY)},
|
|
**{f"{base}:OP-MSG{i}": f"Initial message {i}" for i in range(N_MSG_HISTORY)},
|
|
)
|
|
|
|
# STATUS PVs for all beamlines
|
|
for bl in BEAMLINES:
|
|
base = f"SF-STATUS-{bl}"
|
|
mor.host(
|
|
**{f"{base}:CATEGORY": {"type": "enum", "enums": ["USER", "MD", "SD", "ACCESS", "DOWN"]}},
|
|
**{f"{base}:DOWNTIME": str},
|
|
)
|
|
mor.serve(**{f"{base}:CATEGORY": "USER", f"{base}:DOWNTIME": "00:00:00"})
|
|
|
|
# Serve loop
|
|
while not stop.is_set():
|
|
mor.serve()
|
|
time.sleep(0.05)
|
|
|
|
t = threading.Thread(target=run)
|
|
t.start()
|
|
time.sleep(3.0) # allow CA to announce PVs
|
|
|
|
yield
|
|
|
|
stop.set()
|
|
t.join(timeout=2.0)
|
|
time.sleep(0.2)
|
|
|
|
import socket
|
|
|
|
# Check UDP ports used by EPICS Channel Access
|
|
for port in (5064, 5065):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
sock.bind(("127.0.0.1", port))
|
|
print(f"[OK] Port {port} free")
|
|
except OSError as e:
|
|
print(f"[IN USE] Port {port} already occupied: {e}")
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
# -------- OperationMessageStatus --------
|
|
class TestOperationMessageStatus:
|
|
@pytest.fixture
|
|
def status(self):
|
|
s = OperationMessageStatus("SF-OP:CR-MSG")
|
|
assert s.pv_date.wait_for_connection(timeout=2.0), "PV STATUS-DATE not connected"
|
|
assert s.pv_status.wait_for_connection(timeout=2.0), "PV STATUS not connected"
|
|
return s
|
|
|
|
# Test that initialization works and enums are correct
|
|
def test_initialization_and_enum(self, status):
|
|
assert status.pv_date.connected
|
|
assert status.pv_status.connected
|
|
allowed = status.get_allowed()
|
|
assert set(["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]).issubset(set(allowed))
|
|
|
|
# Test properties .date and .status
|
|
def test_properties_date_status(self, status):
|
|
status.pv_date.put("2025-08-08 10:00:00", wait=True)
|
|
status.pv_status.put("REMOTE", wait=True)
|
|
assert status.date == "2025-08-08 10:00:00"
|
|
assert status.status == "REMOTE"
|
|
|
|
# Test __repr__ uses properties correctly
|
|
def test_repr_uses_properties(self, status):
|
|
status.pv_date.put("2025-08-08 10:01:02", wait=True)
|
|
status.pv_status.put("ATTENDED", wait=True)
|
|
r = repr(status)
|
|
assert "2025-08-08 10:01:02 ATTENDED" in r
|
|
|
|
@pytest.mark.parametrize("method,expected", [
|
|
("set_offline", "OFFLINE"),
|
|
("set_preparation", "PREPARATION"),
|
|
("set_remote", "REMOTE"),
|
|
("set_attended", "ATTENDED"),
|
|
])
|
|
# Test set_* methods
|
|
def test_set_methods_change_status(self, status, method, expected):
|
|
getattr(status, method)()
|
|
assert status.status == expected
|
|
|
|
# Test update() with a valid value
|
|
def test_update_direct_valid(self, status):
|
|
status.update("REMOTE")
|
|
assert status.status == "REMOTE"
|
|
|
|
# Test update() with an invalid value should raise
|
|
def test_update_invalid_raises_valueerror(self, status):
|
|
with pytest.raises(ValueError) as exc:
|
|
status.update("TOTALLY_INVALID_STATUS")
|
|
assert "not from allowed values" in str(exc.value)
|
|
|
|
|
|
# -------- OperationMessageEntry --------
|
|
class TestOperationMessageEntry:
|
|
@pytest.fixture(params=range(N_MSG_HISTORY))
|
|
def entry(self, request):
|
|
idx = request.param
|
|
e = OperationMessageEntry("SF-OP:CR-MSG", idx)
|
|
assert e.pv_date.wait_for_connection(timeout=2.0), f"PV OP-DATE{idx} not connected"
|
|
assert e.pv_msg.wait_for_connection(timeout=2.0), f"PV OP-MSG{idx} not connected"
|
|
return e
|
|
|
|
# Test initialization of entry PVs
|
|
def test_initialization(self, entry):
|
|
assert entry.pv_date.connected
|
|
assert entry.pv_msg.connected
|
|
|
|
# Test .date property
|
|
def test_date_property(self, entry):
|
|
test_val = "2025-08-08 12:34:56"
|
|
entry.pv_date.put(test_val, wait=True)
|
|
assert entry.date == test_val
|
|
|
|
# Test .msg property
|
|
def test_msg_property(self, entry):
|
|
test_val = "Hello from test"
|
|
entry.pv_msg.put(test_val, wait=True)
|
|
assert entry.msg == test_val
|
|
|
|
# Test __repr__ outputs correctly
|
|
def test_repr(self, entry):
|
|
date_val = "2025-08-08 13:00:00"
|
|
msg_val = "System maintenance"
|
|
entry.pv_date.put(date_val, wait=True)
|
|
entry.pv_msg.put(msg_val, wait=True)
|
|
# Small time margin for CA read
|
|
time.sleep(0.05)
|
|
assert repr(entry) == f"{date_val} {msg_val}"
|
|
|
|
|
|
# -------- OperationMessage --------
|
|
class TestOperationMessage:
|
|
# Test initialization with name
|
|
def test_init_with_name(self):
|
|
om = OperationMessage(name="Control Room")
|
|
assert om.name == "Control Room"
|
|
assert om.ID == "CR"
|
|
assert om.prefix == "SF-OP:CR-MSG"
|
|
|
|
assert om.pv_send.wait_for_connection(timeout=10.0)
|
|
assert om.status.pv_status.wait_for_connection(timeout=10.0)
|
|
assert len(om.entries) == N_MSG_HISTORY
|
|
for i, entry in enumerate(om.entries):
|
|
assert entry.pv_date.wait_for_connection(timeout=10.0), f"OP-DATE{i} not connected"
|
|
|
|
# Test initialization with ID
|
|
def test_init_with_id(self):
|
|
om = OperationMessage(ID="cr") # case-insensitive
|
|
assert om.ID == "CR"
|
|
assert om.name == IDS_INVERSE["CR"]
|
|
assert om.prefix == "SF-OP:CR-MSG"
|
|
|
|
# Test error when missing both name and ID
|
|
def test_init_error_when_missing_both(self):
|
|
with pytest.raises(ValueError):
|
|
OperationMessage()
|
|
|
|
# Test __getitem__ returns entry
|
|
def test_getitem_returns_entry(self):
|
|
om = OperationMessage(name="Control Room")
|
|
for i in range(N_MSG_HISTORY):
|
|
e = om[i]
|
|
assert isinstance(e, OperationMessageEntry)
|
|
|
|
# Test update() puts into temporary PV channel
|
|
def test_update_puts_to_tmp_channel(self):
|
|
om = OperationMessage(name="Control Room")
|
|
assert om.pv_send.wait_for_connection(timeout=2.0)
|
|
msg = "Beam down at 14:00"
|
|
om.update(msg)
|
|
# Allow IOC serve() loop to cycle
|
|
time.sleep(0.2)
|
|
got = om.pv_send.get(as_string=True)
|
|
if isinstance(got, bytes):
|
|
got = got.decode("utf-8", errors="ignore")
|
|
assert got == msg
|
|
|
|
# Test status object is correctly wired
|
|
def test_status_object_is_wired(self):
|
|
om = OperationMessage(name="Control Room")
|
|
om.status.update("REMOTE")
|
|
assert om.status.pv_status == "REMOTE"
|
|
|
|
# Test __repr__ contains header and entries
|
|
def test_repr_contains_header_and_entries(self):
|
|
om = OperationMessage(name="Control Room")
|
|
om.status.pv_status.put("ATTENDED")
|
|
om.entries[0].pv_date.put("2025-08-08 12:00:00")
|
|
om.entries[0].pv_msg.put("Slot A ok")
|
|
r = repr(om)
|
|
assert "Control Room (CR)" in r
|
|
assert "ATTENDED" in r
|
|
assert "2025-08-08 12:00:00" in r
|
|
assert "Slot A ok" in r
|
|
|
|
|
|
# -------- OperationMessages --------
|
|
class TestOperationMessages:
|
|
# Test initialization populates entries and items
|
|
def test_initialization_populates_entries_and_items(self):
|
|
oms = OperationMessages()
|
|
|
|
assert set(oms.entries.keys()) == set(IDS.keys())
|
|
|
|
for name, om in oms.entries.items():
|
|
assert isinstance(om, OperationMessage)
|
|
assert om.pv_send.wait_for_connection(timeout=2.0)
|
|
assert om.status.pv_status.wait_for_connection(timeout=2.0)
|
|
for i, entry in enumerate(om.entries):
|
|
assert isinstance(entry, OperationMessageEntry)
|
|
assert entry.pv_date.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-DATE{i} not connected"
|
|
assert entry.pv_msg.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-MSG{i} not connected"
|
|
|
|
for name in IDS.keys():
|
|
attr = clean_name(name)
|
|
assert hasattr(oms, attr), f"Missing attribute: {attr}"
|
|
assert getattr(oms, attr) is oms.entries[name]
|
|
|
|
# Test __getitem__ supports cleaned name and ID
|
|
def test_getitem_supports_cleaned_name_and_cleaned_id(self):
|
|
oms = OperationMessages()
|
|
|
|
name = "Control Room"
|
|
ID = IDS[name]
|
|
cleaned_name = "control_room"
|
|
cleaned_id = "cr"
|
|
|
|
om_by_name = oms[cleaned_name]
|
|
assert isinstance(om_by_name, OperationMessage)
|
|
assert om_by_name.ID == ID
|
|
|
|
om_by_id = oms[cleaned_id]
|
|
assert isinstance(om_by_id, OperationMessage)
|
|
assert om_by_id is om_by_name
|
|
|
|
om_by_original_name = oms["Control Room"]
|
|
assert om_by_original_name is om_by_name
|
|
|
|
with pytest.raises(KeyError):
|
|
_ = oms["does_not_exist"]
|
|
|
|
# Test __iter__ yields OperationMessage instances
|
|
def test_iter_yields_operationmessage_instances(self):
|
|
oms = OperationMessages()
|
|
vals = list(iter(oms))
|
|
assert len(vals) == len(IDS)
|
|
assert all(isinstance(v, OperationMessage) for v in vals)
|
|
|
|
for om in vals:
|
|
# Initial state injected by the IOC
|
|
assert om.status.status == "OFFLINE"
|
|
assert om.status.date == "2024-01-01 00:00:00"
|
|
for i, entry in enumerate(om.entries):
|
|
assert entry.date == f"2024-01-01 00:00:0{i}"
|
|
assert entry.msg == f"Initial message {i}"
|
|
|
|
# Test IPython key completions
|
|
def test_key_completions_exposes_all_cleaned_keys(self):
|
|
oms = OperationMessages()
|
|
keys = set(oms._ipython_key_completions_())
|
|
expected_names = {clean_name(n) for n in IDS.keys()}
|
|
expected_ids = {clean_name(ID) for ID in IDS.values()}
|
|
assert expected_names.issubset(keys)
|
|
assert expected_ids.issubset(keys)
|
|
|
|
# Test __repr__ includes each sub-message repr
|
|
def test_repr_includes_each_submessage_repr(self):
|
|
oms = OperationMessages()
|
|
r = repr(oms)
|
|
assert "Control Room (CR)" in r
|
|
|
|
om_cr = oms["control_room"]
|
|
om_cr.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True)
|
|
om_cr.entries[0].pv_msg.put("Test message 1", wait=True)
|
|
om_cr.entries[1].pv_date.put("2025-08-08 12:05:00", wait=True)
|
|
om_cr.entries[1].pv_msg.put("Test message 2", wait=True)
|
|
|
|
time.sleep(0.05)
|
|
r2 = repr(oms)
|
|
assert "2025-08-08 12:00:00" in r2
|
|
assert "Test message 1" in r2
|
|
assert "2025-08-08 12:05:00" in r2
|
|
assert "Test message 2" in r2
|
|
|
|
|
|
# -------- MachineStatus --------
|
|
class TestMachineStatus:
|
|
# Test initialization and connections for all beamlines
|
|
@pytest.mark.parametrize("beamline", BEAMLINES)
|
|
def test_init_and_connections(self, beamline):
|
|
ms = MachineStatus(beamline)
|
|
assert ms.beamline == beamline.upper()
|
|
assert ms.pv_category.wait_for_connection(timeout=2.0), f"{beamline}: CATEGORY not connected"
|
|
assert ms.pv_downtime.wait_for_connection(timeout=2.0), f"{beamline}: DOWNTIME not connected"
|
|
|
|
# Test invalid beamline raises ValueError
|
|
def test_init_invalid_beamline_raises(self):
|
|
with pytest.raises(ValueError) as exc:
|
|
MachineStatus("NOT_A_BEAMLINE")
|
|
assert 'beamline "NOT_A_BEAMLINE" must be from:' in str(exc.value)
|
|
|
|
# Test .category and .downtime properties
|
|
@pytest.mark.parametrize("beamline", BEAMLINES)
|
|
def test_properties_category_and_downtime(self, beamline):
|
|
ms = MachineStatus(beamline)
|
|
ms.pv_category.put("MD", wait=True)
|
|
ms.pv_downtime.put("01:23:45", wait=True)
|
|
assert ms.category == "MD"
|
|
assert ms.downtime == "01:23:45"
|
|
ms.pv_category.put("SD", wait=True)
|
|
ms.pv_downtime.put("00:10:00", wait=True)
|
|
assert ms.category == "SD"
|
|
assert ms.downtime == "00:10:00"
|
|
|
|
# Test __repr__ contains header and values
|
|
@pytest.mark.parametrize("beamline", BEAMLINES)
|
|
def test_repr_contains_header_and_values(self, beamline):
|
|
ms = MachineStatus(beamline)
|
|
ms.pv_category.put("ACCESS", wait=True)
|
|
ms.pv_downtime.put("00:05:00", wait=True)
|
|
r = repr(ms)
|
|
assert beamline in r
|
|
assert "ACCESS" in r
|
|
assert "00:05:00" in r
|
|
'''
|