164 lines
4.8 KiB
Python
164 lines
4.8 KiB
Python
import patch_put
|
|
import pytest
|
|
import time
|
|
import threading
|
|
import builtins
|
|
from io import StringIO
|
|
import colorama
|
|
from contextlib import contextmanager
|
|
|
|
from slic.utils.pv import PV
|
|
from morbidissimo import MorIOC
|
|
'''
|
|
# === IOC simulé avec MorIOC ===
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def morioc_server():
|
|
stop = threading.Event()
|
|
|
|
def run_ioc():
|
|
with MorIOC("TEST") as mor:
|
|
# Explicit initialization to 0.0
|
|
current_val = 0.0
|
|
mor.host(VAL=float)
|
|
mor.serve(VAL=current_val)
|
|
|
|
# Serve loop until 'stop' is triggered
|
|
while not stop.is_set():
|
|
# Check for new value
|
|
input_val = mor.get("VAL")
|
|
if input_val is not None:
|
|
current_val = input_val
|
|
|
|
# Serve the current value
|
|
mor.serve(VAL=current_val)
|
|
time.sleep(0.05) # Small delay to avoid busy loop
|
|
|
|
# Start IOC in a normal thread (not daemon so we can join later)
|
|
t = threading.Thread(target=run_ioc)
|
|
t.start()
|
|
time.sleep(0.8) # Give Channel Access time to announce PVs
|
|
|
|
yield # <<< tests in this module run here >>>
|
|
|
|
# --- Teardown: stop IOC and free ports ---
|
|
stop.set()
|
|
t.join(timeout=2.0)
|
|
time.sleep(0.2) # Small delay to let OS release sockets
|
|
'''
|
|
# === Utils ===
|
|
@pytest.fixture
|
|
def capture_stdout(monkeypatch):
|
|
buf = StringIO()
|
|
monkeypatch.setattr("sys.stdout", buf)
|
|
return buf
|
|
|
|
@contextmanager
|
|
def use_callback(pv, callback):
|
|
cbid = pv.add_callback(callback)
|
|
try:
|
|
yield
|
|
finally:
|
|
pv.remove_callback(cbid)
|
|
|
|
import re
|
|
|
|
def strip_ansi(text):
|
|
ansi_escape = re.compile(r'\x1b\[[0-9;]*m')
|
|
return ansi_escape.sub('', text)
|
|
|
|
# === Tests ===
|
|
@pytest.mark.parametrize("value_new, value_before, show_bar, expected_color", [
|
|
(25, 0, True, colorama.Fore.GREEN),
|
|
(50, 25, True, colorama.Fore.GREEN),
|
|
(75, 50, False, colorama.Fore.GREEN),
|
|
(100, 75, True, colorama.Fore.GREEN),
|
|
(150, 100, False, colorama.Fore.GREEN),
|
|
(-50, 150, True, colorama.Fore.GREEN)
|
|
])
|
|
|
|
def test_put_with_progress_and_repr(value_new, value_before, show_bar, expected_color):
|
|
pv = PV("TEST:VAL")
|
|
|
|
pv.put(value_before)
|
|
assert pv.get() == value_before
|
|
|
|
# Capture tous les prints dans une liste
|
|
printed_lines = []
|
|
|
|
def fake_print(*args, **kwargs):
|
|
line = " ".join(str(a) for a in args)
|
|
printed_lines.append(line)
|
|
|
|
# Monkeypatch print uniquement dans ce contexte
|
|
original_print = builtins.print
|
|
builtins.print = fake_print
|
|
try:
|
|
pv.put(value_new, show_progress=show_bar)
|
|
finally:
|
|
builtins.print = original_print
|
|
|
|
printed_lines = [line for line in printed_lines if line.strip()]
|
|
cleaned_lines = [strip_ansi(line) for line in printed_lines]
|
|
|
|
# Initialisation bar
|
|
if show_bar==True:
|
|
matches = [line for line in cleaned_lines if f"| |" in line]
|
|
assert matches, f"Expected bar not found in:\n" + "\n".join(printed_lines)
|
|
|
|
# Vérifie que la bonne barre a été affichée au moins une fois
|
|
matches = [line for line in cleaned_lines if f"|██████████████████████████████|" in line]
|
|
assert matches, f"Expected bar not found in:\n" + "\n".join(printed_lines)
|
|
|
|
# Vérifie que la couleur est bien utilisée (au moins une fois dans les lignes printées)
|
|
color_matches = [line for line in printed_lines if expected_color in line]
|
|
assert color_matches, "Expected color code not found"
|
|
|
|
assert all(f"{value_new}" in line for line in printed_lines), "new value not in all lines"
|
|
assert all(f"{value_before}" in line for line in printed_lines), "old value not in all lines"
|
|
|
|
else :
|
|
|
|
assert len(printed_lines) == 0
|
|
|
|
# Vérifie que la valeur finale est correcte
|
|
assert pv.get() == value_new
|
|
|
|
def test_use_callback_context_manager():
|
|
pv = PV("TEST:VAL")
|
|
|
|
pv.put(0.0)
|
|
seen_values = []
|
|
|
|
def callback(value=None, **kwargs):
|
|
seen_values.append(value)
|
|
|
|
with pv.use_callback(callback):
|
|
pv.put(42.0)
|
|
|
|
# Vérifie que le callback a bien été appelé
|
|
assert 42.0 in seen_values
|
|
|
|
with pv.use_callback(callback):
|
|
pv.put(24.0)
|
|
|
|
assert 24.0 in seen_values
|
|
|
|
with pv.use_callback(callback):
|
|
pv.put(75.0)
|
|
|
|
assert 75.0 in seen_values
|
|
|
|
|
|
def test_orig_repr_is_not_custom_repr():
|
|
pv = PV("TEST:VAL", connection_timeout=2.0)
|
|
custom = repr(pv)
|
|
original = pv.orig_repr()
|
|
|
|
# Check that the custom repr is not the same as the original
|
|
assert custom != original
|
|
|
|
# Check that the original repr looks like a real epics PV
|
|
assert original.startswith("<PV ")
|
|
assert "TEST:VAL" in original
|
|
|
|
assert f'PV "TEST:VAL" at' in custom |