From 363f1335e050a6a946b4b7ecea20fe4fea4264eb Mon Sep 17 00:00:00 2001 From: tligui_y Date: Sat, 9 Aug 2025 01:41:15 +0200 Subject: [PATCH] Update tests/test_utils_opmsg.py --- tests/test_utils_opmsg.py | 50 +++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/test_utils_opmsg.py b/tests/test_utils_opmsg.py index 2c65d8541..eafc36df5 100644 --- a/tests/test_utils_opmsg.py +++ b/tests/test_utils_opmsg.py @@ -10,14 +10,11 @@ from slic.utils.opmsg import ( clean_name, IDS_INVERSE ) -def ioc(): - """ - IOC de test pour: - - STATUS / STATUS-DATE - - OP-DATEi / OP-MSGi - - OP-MSG-tmp / OP-MSG-TMP - - CATEGORY / DOWNTIME - """ +@pytest.fixture(scope="module", autouse=True) +def run_all_iocs(): + stop = threading.Event() + threads = [] + def run_op_prefix(prefix: str): with MorIOC(prefix) as mor: mor.host( @@ -36,41 +33,44 @@ def ioc(): **{"OP-MSG-tmp": ""}, **{"OP-MSG-TMP": ""}, ) - while True: + while not stop.is_set(): mor.serve() time.sleep(0.1) def run_status_prefix(beamline: str): - # IMPORTANT: le code utilise SF-STATUS-{beamline} with MorIOC(f"SF-STATUS-{beamline}") as mor: mor.host( CATEGORY={"type": "enum", "enums": ["USER", "MD", "SD", "ACCESS", "DOWN"]}, DOWNTIME=str, ) mor.serve(CATEGORY="USER", DOWNTIME="00:00:00") - while True: + while not stop.is_set(): mor.serve() time.sleep(0.1) - threads = [] - # IOCs pour opmsg (un par ID) + # Start all OPMSG IOCs for ID in IDS.values(): prefix = f"SF-OP:{ID}-MSG" - t = threading.Thread(target=run_op_prefix, args=(prefix,), daemon=True) + t = threading.Thread(target=run_op_prefix, args=(prefix,)) t.start() threads.append(t) - # IOCs pour machinestatus - for bl in BEAMLINES: - t = threading.Thread(target=run_status_prefix, args=(bl,), daemon=True) - t.start() - threads.append(t) - return threads -@pytest.fixture(scope="module", autouse=True) -def run_all_iocs(): - threads = ioc() - time.sleep(2.0) # laisser le temps aux serveurs de démarrer - yield + # Start all MachineStatus IOCs + for bl in BEAMLINES: + t = threading.Thread(target=run_status_prefix, args=(bl,)) + t.start() + threads.append(t) + + time.sleep(2.0) # Allow time for all PVs to be announced + + yield # <<< all tests in this module run here >>> + + # --- Teardown: stop all IOCs and wait for threads to finish --- + stop.set() + for t in threads: + t.join(timeout=2.0) + time.sleep(0.2) # Give OS a moment to release the sockets + # -------- OperationMessageStatus --------