Task tests passing

This commit is contained in:
gac-x05la
2025-05-13 19:13:50 +02:00
committed by marone
parent 44b93c529e
commit 739811959e
2 changed files with 16 additions and 12 deletions

View File

@@ -5,6 +5,7 @@ interface.
@author: mohacsi_i
"""
import time
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
@@ -168,18 +169,19 @@ class aa1Tasks(PSIDeviceBase, Device):
if len(self.fileName.get())==0:
self.fileName.set("bec.ascript", settle_time=0.1).wait()
# NOTE: old_value can either be initialized or set to UNSET_VALUE
def wait_until_new(*_, old_value, value, **__):
result = str(old_value) not in ("None", "UNSET_VALUE")
print(f"FAILURE: {old_value}\t{value}\t{type(old_value)}\t{type(value)}\t{result}")
self._failure.read()
ts_old = float(self._failure.timestamp)
def wait_until_new(*_, old_value, value, timestamp, **__):
nonlocal ts_old
result = bool(ts_old != timestamp) and (value!=-1)
# print(f"{old_value}\t{value}\t{ts_old}\t{timestamp}\t{result}")
return result
# Subscribe and wait for update
status = SubscriptionStatus(self._failure, wait_until_new, settle_time=0.1)
# Load and check success
self.switch.set("Load", settle_time=0.2).wait()
self.switch.set("Load", settle_time=0.0).wait()
status.wait()
if self._failure.value:
raise RuntimeError("Failed to load task, please check the Aerotech IOC")
@@ -200,12 +202,14 @@ class aa1Tasks(PSIDeviceBase, Device):
def complete(self) -> SubscriptionStatus:
"""Wait for a RUNNING task"""
# Sleep for foll period
time.sleep(1)
timestamp_ = 0
task_idx = int(self.taskIndex.get())
def not_running(*, value, timestamp, **_):
nonlocal timestamp_
print(value)
# print(f"State {value[task_idx]} in {value}")
result = value[task_idx] not in ["Running", 4]
timestamp_ = timestamp
return result

View File

@@ -93,10 +93,11 @@ class AerotechAutomation1Test(unittest.TestCase):
text = "program\nvar $positions[2] as roll\neeeend"
dev.configure({'script_task': 3, 'script_text': text})
# Should raise when Load
dev.stage()
with self.assertRaises(RuntimeError):
dev.stage()
dev.unstage()
# Run a sleep function on the iSMC
# # Run a sleep function on the iSMC
print("TASK: Testing a blocking command execution")
text = """
$rglobal[4]=1.234
@@ -114,9 +115,8 @@ class AerotechAutomation1Test(unittest.TestCase):
self.assertTrue( 4 < t_elapsed < 9, f"A 5 seconds sleep finished in: {t_elapsed}")
dev.unstage()
# Attempt to run a short motion command using the Execute interface
# # Attempt to run a short motion command using the Execute interface
print("TASK: Testing a short motion command")
text = """
var $fDist as real = 90
var $fVelo as real = 200