This commit is contained in:
+338
-154
@@ -1,182 +1,366 @@
|
||||
import time
|
||||
import threading
|
||||
import pytest
|
||||
import epics
|
||||
from epics.pv import PV
|
||||
from slic.utils.hastyepics import *
|
||||
from morbidissimo import MorIOC
|
||||
|
||||
# IOC simulation
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def morioc_server():
|
||||
"""Démarre un IOC simulé pour tous les tests."""
|
||||
def run_ioc():
|
||||
with MorIOC("TEST:SIM:") as mor:
|
||||
current_val = 0.0
|
||||
mor.host(
|
||||
VAL=float,
|
||||
RBV=float,
|
||||
STATUS={"type": "enum", "enums": ["IDLE", "MOVING", "DONE"]},
|
||||
disabled=int, # pour tester suppression
|
||||
)
|
||||
while True:
|
||||
val_in = mor.get("VAL")
|
||||
if val_in is not None:
|
||||
current_val = val_in
|
||||
mor.serve(
|
||||
VAL=current_val,
|
||||
RBV=current_val,
|
||||
STATUS=0,
|
||||
disabled=0,
|
||||
RTYP="motor" # pour que epics.Motor original accepte
|
||||
)
|
||||
time.sleep(0.05)
|
||||
from slic.utils.opmsg import *
|
||||
|
||||
t = threading.Thread(target=run_ioc, daemon=True)
|
||||
t.start()
|
||||
|
||||
import time
|
||||
import threading
|
||||
import pytest
|
||||
from morbidissimo import MorIOC
|
||||
from slic.utils.opmsg import IDS, BEAMLINES, N_MSG_HISTORY
|
||||
|
||||
def ioc():
|
||||
"""
|
||||
IOC de test pour:
|
||||
- STATUS / STATUS-DATE
|
||||
- OP-DATEi / OP-MSGi
|
||||
- OP-MSG-tmp (exactement ce que ton code utilise)
|
||||
- CATEGORY / DOWNTIME
|
||||
"""
|
||||
def run_op_prefix(prefix: str):
|
||||
with MorIOC(prefix) as mor:
|
||||
mor.host(
|
||||
STATUS={"type": "enum", "enums": ["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]},
|
||||
**{"STATUS-DATE": str},
|
||||
**{f"OP-DATE{i}": str for i in range(N_MSG_HISTORY)},
|
||||
**{f"OP-MSG{i}": str for i in range(N_MSG_HISTORY)},
|
||||
**{"OP-MSG-TMP": str},
|
||||
)
|
||||
|
||||
# Seed une seule fois
|
||||
mor.serve(
|
||||
STATUS="OFFLINE",
|
||||
**{"STATUS-DATE": "2024-01-01 00:00:00"},
|
||||
**{f"OP-DATE{i}": f"2024-01-01 00:00:0{i}" for i in range(N_MSG_HISTORY)},
|
||||
**{f"OP-MSG{i}": f"Initial message {i}" for i in range(N_MSG_HISTORY)},
|
||||
**{"OP-MSG-TMP": ""},
|
||||
)
|
||||
|
||||
while True:
|
||||
mor.serve()
|
||||
time.sleep(0.02)
|
||||
|
||||
def run_status_prefix(beamline: str):
|
||||
with MorIOC(f"SF-STATUS-{beamline}") as mor:
|
||||
mor.host(
|
||||
CATEGORY={"type": "enum", "enums": ["USER", "MD", "SD", "ACCESS", "DOWN"]},
|
||||
DOWNTIME=str,
|
||||
)
|
||||
mor.serve(CATEGORY="USER", DOWNTIME="00:00:00")
|
||||
while True:
|
||||
mor.serve()
|
||||
time.sleep(0.02)
|
||||
|
||||
threads = []
|
||||
for ID in IDS.values():
|
||||
prefix = f"SF-OP:{ID}-MSG"
|
||||
t = threading.Thread(target=run_op_prefix, args=(prefix,), daemon=True)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
for bl in BEAMLINES:
|
||||
t = threading.Thread(target=run_status_prefix, args=(bl,), daemon=True)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
return threads
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def run_all_iocs():
|
||||
threads = ioc()
|
||||
time.sleep(0.5)
|
||||
yield
|
||||
|
||||
|
||||
def test_get_pv_connect_false_and_true():
|
||||
pv = get_pv("TEST:SIM:VAL", connect=False)
|
||||
assert isinstance(pv, PV)
|
||||
assert not pv.connected
|
||||
|
||||
pv2 = get_pv("TEST:SIM:VAL", connect=True, timeout=2.0)
|
||||
assert pv2.connected
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def run_all_iocs():
|
||||
threads = ioc()
|
||||
# Short boot delay to ensure IOCs are listening
|
||||
time.sleep(0.3)
|
||||
yield
|
||||
# Daemon threads exit with pytest
|
||||
|
||||
|
||||
def test_motor_init_strips_suffixes():
|
||||
m1 = Motor("TEST:SIM:M1.VAL")
|
||||
assert m1._prefix == "TEST:SIM:M1"
|
||||
|
||||
m2 = Motor("TEST:SIM:M2.")
|
||||
assert m2._prefix == "TEST:SIM:M2"
|
||||
|
||||
m3 = Motor("TEST:SIM:M3")
|
||||
assert m3._prefix == "TEST:SIM:M3"
|
||||
# Autouse fixture to start IOC once per module
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def run_all_iocs():
|
||||
threads = ioc()
|
||||
# Small boot delay so all IOCs are ready before the tests run
|
||||
time.sleep(0.3)
|
||||
yield
|
||||
# Daemon threads end with pytest
|
||||
|
||||
|
||||
def test_motor_invalid_name_raises():
|
||||
with pytest.raises(MotorException):
|
||||
Motor(None)
|
||||
# Tests: OperationMessageStatus
|
||||
class TestOperationMessageStatus:
|
||||
@pytest.fixture
|
||||
def status(self):
|
||||
s = OperationMessageStatus("SF-OP:CR-MSG")
|
||||
assert s.pv_date.wait_for_connection(timeout=2.0), "PV STATUS-DATE not connected"
|
||||
assert s.pv_status.wait_for_connection(timeout=2.0), "PV STATUS not connected"
|
||||
return s
|
||||
|
||||
def test_initialization_and_enum(self, status):
|
||||
# Ensure PVs are created and connected, and allowed values match expectations
|
||||
assert status.pv_date is not None
|
||||
assert status.pv_status is not None
|
||||
assert status.pv_date.connected
|
||||
assert status.pv_status.connected
|
||||
|
||||
allowed = status.get_allowed()
|
||||
assert set(["OFFLINE", "PREPARATION", "REMOTE", "ATTENDED"]).issubset(set(allowed))
|
||||
|
||||
def test_properties_date_status(self, status):
|
||||
# Verify the .date and .status properties return correct values
|
||||
status.pv_date.put("2025-08-08 10:00:00", wait=True)
|
||||
status.pv_status.put("REMOTE", wait=True)
|
||||
assert status.date == "2025-08-08 10:00:00"
|
||||
assert status.status == "REMOTE"
|
||||
|
||||
def test_repr_uses_properties(self, status):
|
||||
# Check that __repr__ includes both date and status
|
||||
status.pv_date.put("2025-08-08 10:01:02", wait=True)
|
||||
status.pv_status.put("ATTENDED", wait=True)
|
||||
r = repr(status)
|
||||
assert "2025-08-08 10:01:02 ATTENDED" in r
|
||||
|
||||
@pytest.mark.parametrize("method,expected", [
|
||||
("set_offline", "OFFLINE"),
|
||||
("set_preparation", "PREPARATION"),
|
||||
("set_remote", "REMOTE"),
|
||||
("set_attended", "ATTENDED"),
|
||||
])
|
||||
def test_set_methods_change_status(self, status, method, expected):
|
||||
getattr(status, method)()
|
||||
assert status.status == expected
|
||||
|
||||
def test_update_direct_valid(self, status):
|
||||
status.update("REMOTE")
|
||||
assert status.status == "REMOTE"
|
||||
|
||||
def test_update_invalid_raises_valueerror(self, status):
|
||||
with pytest.raises(ValueError) as exc:
|
||||
status.update("TOTALLY_INVALID_STATUS")
|
||||
assert "not from allowed values" in str(exc.value)
|
||||
|
||||
|
||||
def test_motor_init_list_and_extras():
|
||||
m = Motor("TEST:SIM:M4")
|
||||
assert "disabled" not in m._init_list
|
||||
assert "disabled" not in m._extras
|
||||
assert len(m._init_list) > 0
|
||||
assert len(m._extras) > 0
|
||||
# Tests: OperationMessageEntry
|
||||
class TestOperationMessageEntry:
|
||||
@pytest.fixture(params=range(N_MSG_HISTORY))
|
||||
def entry(self, request):
|
||||
idx = request.param
|
||||
e = OperationMessageEntry("SF-OP:CR-MSG", idx)
|
||||
assert e.pv_date.wait_for_connection(timeout=2.0), f"PV OP-DATE{idx} not connected"
|
||||
assert e.pv_msg.wait_for_connection(timeout=2.0), f"PV OP-MSG{idx} not connected"
|
||||
return e
|
||||
|
||||
for attr in m._init_list:
|
||||
pv = getattr(m, attr)
|
||||
assert isinstance(pv, PV)
|
||||
assert not pv.connected
|
||||
def test_initialization(self, entry):
|
||||
assert entry.pv_date is not None
|
||||
assert entry.pv_msg is not None
|
||||
assert entry.pv_date.connected
|
||||
assert entry.pv_msg.connected
|
||||
|
||||
for attr_name, suffix in m._extras.items():
|
||||
pv = getattr(m, attr_name)
|
||||
assert isinstance(pv, PV)
|
||||
assert pv.pvname == f"TEST:SIM:M4{suffix}"
|
||||
def test_date_property(self, entry):
|
||||
test_val = "2025-08-08 12:34:56"
|
||||
entry.pv_date.put(test_val, wait=True)
|
||||
assert entry.date == test_val
|
||||
|
||||
def test_msg_property(self, entry):
|
||||
test_val = "Hello from test"
|
||||
entry.pv_msg.put(test_val, wait=True)
|
||||
assert entry.msg == test_val
|
||||
|
||||
def test_repr(self, entry):
|
||||
date_val = "2025-08-08 13:00:00"
|
||||
msg_val = "System maintenance"
|
||||
entry.pv_date.put(date_val, wait=True)
|
||||
entry.pv_msg.put(msg_val, wait=True)
|
||||
expected_repr = f"{date_val} {msg_val}"
|
||||
assert repr(entry) == expected_repr
|
||||
|
||||
|
||||
def test_motor_pv_method_connects_and_reads():
|
||||
m = Motor("TEST:SIM:M5")
|
||||
pv_val = m.PV("VAL", connect=True, timeout=1.0)
|
||||
assert pv_val.connected
|
||||
pv_val.put(123.0)
|
||||
time.sleep(0.1)
|
||||
assert pv_val.get() == 123.0
|
||||
# Tests: OperationMessage
|
||||
class TestOperationMessage:
|
||||
def test_init_with_name(self):
|
||||
om = OperationMessage(name="Control Room")
|
||||
assert om.name == "Control Room"
|
||||
assert om.ID == "CR"
|
||||
assert om.prefix == "SF-OP:CR-MSG"
|
||||
|
||||
assert om.pv_send.wait_for_connection(timeout=2.0)
|
||||
assert om.status.pv_status.wait_for_connection(timeout=2.0)
|
||||
assert len(om.entries) == N_MSG_HISTORY
|
||||
for i, entry in enumerate(om.entries):
|
||||
assert entry.pv_date.wait_for_connection(timeout=2.0), f"OP-DATE{i} not connected"
|
||||
assert entry.pv_msg.wait_for_connection(timeout=2.0), f"OP-MSG{i} not connected"
|
||||
|
||||
def test_init_with_id(self):
|
||||
om = OperationMessage(ID="cr") # case-insensitive
|
||||
assert om.ID == "CR"
|
||||
assert om.name == IDS_INVERSE["CR"] # "Control Room"
|
||||
assert om.prefix == "SF-OP:CR-MSG"
|
||||
|
||||
def test_init_error_when_missing_both(self):
|
||||
with pytest.raises(ValueError):
|
||||
OperationMessage()
|
||||
|
||||
def test_getitem_returns_entry(self):
|
||||
om = OperationMessage(name="Control Room")
|
||||
for i in range(N_MSG_HISTORY):
|
||||
e = om[i]
|
||||
assert isinstance(e, OperationMessageEntry)
|
||||
|
||||
def test_update_puts_to_tmp_channel(self):
|
||||
om = OperationMessage(name="Control Room")
|
||||
assert om.pv_send.wait_for_connection(timeout=2.0)
|
||||
msg = "Beam down at 14:00"
|
||||
om.update(msg)
|
||||
got = om.pv_send.get(as_string=True)
|
||||
if isinstance(got, bytes):
|
||||
got = got.decode("utf-8", errors="ignore")
|
||||
assert got == msg
|
||||
|
||||
def test_status_object_is_wired(self):
|
||||
om = OperationMessage(name="Control Room")
|
||||
assert isinstance(om.status, OperationMessageStatus)
|
||||
om.status.update("REMOTE")
|
||||
assert om.status.status == "REMOTE"
|
||||
|
||||
def test_repr_contains_header_and_entries(self):
|
||||
om = OperationMessage(name="Control Room")
|
||||
om.status.pv_status.put("ATTENDED", wait=True)
|
||||
om.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True)
|
||||
om.entries[0].pv_msg.put("Slot A ok", wait=True)
|
||||
r = repr(om)
|
||||
assert "Control Room (CR)" in r
|
||||
assert "ATTENDED" in r
|
||||
assert "2025-08-08 12:00:00" in r
|
||||
assert "Slot A ok" in r
|
||||
|
||||
|
||||
def test_motor_reads_enum_status():
|
||||
m = Motor("TEST:SIM:M6")
|
||||
pv_status = m.PV("STATUS", connect=True)
|
||||
allowed = pv_status.enum_strs
|
||||
assert allowed == ["IDLE", "MOVING", "DONE"]
|
||||
# Tests: OperationMessages
|
||||
class TestOperationMessages:
|
||||
def test_initialization_populates_entries_and_items(self):
|
||||
oms = OperationMessages()
|
||||
|
||||
# One entry per human-readable name
|
||||
assert set(oms.entries.keys()) == set(IDS.keys())
|
||||
|
||||
# Each value is a wired/connected OperationMessage
|
||||
for name, om in oms.entries.items():
|
||||
assert isinstance(om, OperationMessage)
|
||||
assert om.pv_send.wait_for_connection(timeout=2.0)
|
||||
assert om.status.pv_status.wait_for_connection(timeout=2.0)
|
||||
for i, entry in enumerate(om.entries):
|
||||
assert isinstance(entry, OperationMessageEntry)
|
||||
assert entry.pv_date.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-DATE{i} not connected"
|
||||
assert entry.pv_msg.wait_for_connection(timeout=2.0), f"{om.prefix}:OP-MSG{i} not connected"
|
||||
|
||||
# Dynamic attributes exist (cleaned names → attributes)
|
||||
for name in IDS.keys():
|
||||
attr = clean_name(name)
|
||||
assert hasattr(oms, attr), f"Missing attribute: {attr}"
|
||||
assert getattr(oms, attr) is oms.entries[name]
|
||||
|
||||
def test_getitem_supports_cleaned_name_and_cleaned_id(self):
|
||||
oms = OperationMessages()
|
||||
|
||||
name = "Control Room"
|
||||
ID = IDS[name]
|
||||
cleaned_name = "control_room"
|
||||
cleaned_id = "cr"
|
||||
|
||||
om_by_name = oms[cleaned_name]
|
||||
assert isinstance(om_by_name, OperationMessage)
|
||||
assert om_by_name.ID == ID
|
||||
|
||||
om_by_id = oms[cleaned_id]
|
||||
assert isinstance(om_by_id, OperationMessage)
|
||||
assert om_by_id is om_by_name
|
||||
|
||||
om_by_original_name = oms["Control Room"]
|
||||
assert om_by_original_name is om_by_name
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
_ = oms["does_not_exist"]
|
||||
|
||||
def test_iter_yields_operationmessage_instances(self):
|
||||
oms = OperationMessages()
|
||||
vals = list(iter(oms))
|
||||
assert len(vals) == len(IDS)
|
||||
assert all(isinstance(v, OperationMessage) for v in vals)
|
||||
|
||||
# All prefixes serve identical initial data; verify contents quickly
|
||||
for om in vals:
|
||||
assert om.status.status == "OFFLINE"
|
||||
assert om.status.date == "2024-01-01 00:00:00"
|
||||
for i, entry in enumerate(om.entries):
|
||||
assert entry.date == f"2024-01-01 00:00:0{i}"
|
||||
assert entry.msg == f"Initial message {i}"
|
||||
|
||||
def test_key_completions_exposes_all_cleaned_keys(self):
|
||||
oms = OperationMessages()
|
||||
keys = set(oms._ipython_key_completions_())
|
||||
expected_names = {clean_name(n) for n in IDS.keys()}
|
||||
expected_ids = {clean_name(ID) for ID in IDS.values()}
|
||||
assert expected_names.issubset(keys)
|
||||
assert expected_ids.issubset(keys)
|
||||
|
||||
def test_repr_includes_each_submessage_repr(self):
|
||||
oms = OperationMessages()
|
||||
r = repr(oms)
|
||||
assert "Control Room (CR)" in r # header from one submessage
|
||||
|
||||
om_cr = oms["control_room"]
|
||||
# Modify two consecutive entries
|
||||
om_cr.entries[0].pv_date.put("2025-08-08 12:00:00", wait=True)
|
||||
om_cr.entries[0].pv_msg.put("Test message 1", wait=True)
|
||||
om_cr.entries[1].pv_date.put("2025-08-08 12:05:00", wait=True)
|
||||
om_cr.entries[1].pv_msg.put("Test message 2", wait=True)
|
||||
|
||||
r2 = repr(oms)
|
||||
assert "2025-08-08 12:00:00" in r2
|
||||
assert "Test message 1" in r2
|
||||
assert "2025-08-08 12:05:00" in r2
|
||||
assert "Test message 2" in r2
|
||||
|
||||
|
||||
def test_speedup_get_pv():
|
||||
"""Test que get_pv optimisé est plus rapide que epics.get_pv."""
|
||||
N = 10
|
||||
t0 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
get_pv("TEST:SIM:M1.VAL", connect=False)
|
||||
t1 = time.perf_counter()
|
||||
# Tests: MachineStatus
|
||||
class TestMachineStatus:
|
||||
@pytest.mark.parametrize("beamline", BEAMLINES)
|
||||
def test_init_and_connections(self, beamline):
|
||||
ms = MachineStatus(beamline)
|
||||
assert ms.beamline == beamline.upper()
|
||||
assert ms.pv_category.wait_for_connection(timeout=2.0), f"{beamline}: CATEGORY not connected"
|
||||
assert ms.pv_downtime.wait_for_connection(timeout=2.0), f"{beamline}: DOWNTIME not connected"
|
||||
|
||||
t2 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
epics.get_pv("TEST:SIM:M1.VAL", connect=False)
|
||||
t3 = time.perf_counter()
|
||||
def test_init_invalid_beamline_raises(self):
|
||||
with pytest.raises(ValueError) as exc:
|
||||
MachineStatus("NOT_A_BEAMLINE")
|
||||
assert 'beamline "NOT_A_BEAMLINE" must be from:' in str(exc.value)
|
||||
|
||||
fast_time = t1 - t0
|
||||
slow_time = t3 - t2
|
||||
assert fast_time < slow_time, (
|
||||
f"get_pv optimisé ({fast_time:.6f}s) plus lent que EPICS ({slow_time:.6f}s)"
|
||||
)
|
||||
@pytest.mark.parametrize("beamline", BEAMLINES)
|
||||
def test_properties_category_and_downtime(self, beamline):
|
||||
ms = MachineStatus(beamline)
|
||||
ms.pv_category.put("MD", wait=True)
|
||||
ms.pv_downtime.put("01:23:45", wait=True)
|
||||
assert ms.category == "MD"
|
||||
assert ms.downtime == "01:23:45"
|
||||
|
||||
ms.pv_category.put("SD", wait=True)
|
||||
ms.pv_downtime.put("00:10:00", wait=True)
|
||||
assert ms.category == "SD"
|
||||
assert ms.downtime == "00:10:00"
|
||||
|
||||
def test_speedup_motor_instantiation():
|
||||
"""Test que Motor optimisé est plus rapide à instancier que epics.Motor."""
|
||||
N = 10
|
||||
t0 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
Motor("TEST:SIM:M2")
|
||||
t1 = time.perf_counter()
|
||||
|
||||
t2 = time.perf_counter()
|
||||
for _ in range(N):
|
||||
epics.Motor("TEST:SIM:M3")
|
||||
t3 = time.perf_counter()
|
||||
|
||||
fast_time = t1 - t0
|
||||
slow_time = t3 - t2
|
||||
assert fast_time < slow_time, (
|
||||
f"Motor optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
|
||||
def test_speedup_motor_PV():
|
||||
"""Test que Motor.PV optimisé est plus rapide avec connect=False."""
|
||||
m_epics = epics.Motor("TEST:SIM:M4")
|
||||
t0 = time.perf_counter()
|
||||
pv_epics = m_epics.PV("VAL", connect=True, timeout=1.0)
|
||||
t1 = time.perf_counter()
|
||||
|
||||
m_fast = Motor("TEST:SIM:M5")
|
||||
t2 = time.perf_counter()
|
||||
pv_fast = m_fast.PV("VAL", connect=False, timeout=1.0)
|
||||
t3 = time.perf_counter()
|
||||
|
||||
assert pv_epics.connected
|
||||
assert pv_fast.connected
|
||||
|
||||
slow_time = t1 - t0
|
||||
fast_time = t3 - t2
|
||||
assert fast_time < slow_time, (
|
||||
f"Motor.PV optimisé ({fast_time:.6f}s) plus lent que EPICS.Motor.PV ({slow_time:.6f}s)"
|
||||
)
|
||||
|
||||
|
||||
def test_motor_init_list_attrs_created():
|
||||
m = Motor("TEST:SIM:M10")
|
||||
for attr in m._init_list:
|
||||
pv = getattr(m, attr)
|
||||
assert isinstance(pv, PV), f"{attr} n'est pas un PV"
|
||||
assert not pv.connected, f"{attr} devrait être non connecté au départ"
|
||||
|
||||
|
||||
def test_motor_extras_attrs_correct():
|
||||
m = Motor("TEST:SIM:M11")
|
||||
for key, suffix in m._extras.items():
|
||||
assert hasattr(m, key), f"{key} manquant dans Motor"
|
||||
pv = getattr(m, key)
|
||||
assert isinstance(pv, PV)
|
||||
assert pv.pvname == f"TEST:SIM:M11{suffix}"
|
||||
|
||||
|
||||
def test_motor_callbacks_empty():
|
||||
m = Motor("TEST:SIM:M12")
|
||||
assert isinstance(m._callbacks, dict)
|
||||
assert m._callbacks == {}
|
||||
@pytest.mark.parametrize("beamline", BEAMLINES)
|
||||
def test_repr_contains_header_and_values(self, beamline):
|
||||
ms = MachineStatus(beamline)
|
||||
ms.pv_category.put("ACCESS", wait=True)
|
||||
ms.pv_downtime.put("00:05:00", wait=True)
|
||||
r = repr(ms)
|
||||
assert beamline in r
|
||||
assert "ACCESS" in r
|
||||
assert "00:05:00" in r
|
||||
|
||||
Reference in New Issue
Block a user