From 9633006fb465925abd4dcb05cc3a8d406277f796 Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Wed, 7 May 2025 17:54:28 +0200 Subject: [PATCH] WIP on tests --- .../devices/aerotech/AerotechAutomation1.py | 11 +- .../devices/aerotech/test/autoHilTest.py | 899 ++++++++++-------- 2 files changed, 523 insertions(+), 387 deletions(-) diff --git a/tomcat_bec/devices/aerotech/AerotechAutomation1.py b/tomcat_bec/devices/aerotech/AerotechAutomation1.py index 8d549a1..aa303cf 100644 --- a/tomcat_bec/devices/aerotech/AerotechAutomation1.py +++ b/tomcat_bec/devices/aerotech/AerotechAutomation1.py @@ -35,21 +35,22 @@ class aa1Controller(Device): """Ophyd proxy class for the Aerotech Automation 1's core controller functionality""" # ToDo: Add error subscription - controllername = Component(EpicsSignalRO, "NAME", kind=Kind.config) - serialnumber = Component(EpicsSignalRO, "SN", kind=Kind.config) - apiversion = Component(EpicsSignalRO, "API_VERSION", kind=Kind.config) + controllername = Component(EpicsSignalRO, "NAME", string=True, kind=Kind.config) + serialnumber = Component(EpicsSignalRO, "SN", string=True, kind=Kind.config) + apiversion = Component(EpicsSignalRO, "API_VERSION", string=True, kind=Kind.config) axiscount = Component(EpicsSignalRO, "AXISCOUNT", kind=Kind.config) taskcount = Component(EpicsSignalRO, "TASKCOUNT", kind=Kind.config) fastpoll = Component(EpicsSignalRO, "POLLTIME", auto_monitor=True, kind=Kind.normal) slowpoll = Component(EpicsSignalRO, "DRVPOLLTIME", auto_monitor=True, kind=Kind.normal) errno = Component(EpicsSignalRO, "ERRNO", auto_monitor=True, kind=Kind.normal) - errnmsg = Component(EpicsSignalRO, "ERRMSG", auto_monitor=True, kind=Kind.normal) + errnmsg = Component(EpicsSignalRO, "ERRMSG", string=True, auto_monitor=True, kind=Kind.normal) _set_ismc = Component(EpicsSignal, "SET", put_complete=True, kind=Kind.omitted) USER_ACCESS = ["reset"] def reset(self): - """ Resets the Automation1 iSMC reloading the default configuration""" + """Resets the Automation1 iSMC reloading the default configuration. Note that this will + erase all settings that were set during startup and not saved to the MCD file.""" self._set_ismc.set(3).wait() diff --git a/tomcat_bec/devices/aerotech/test/autoHilTest.py b/tomcat_bec/devices/aerotech/test/autoHilTest.py index d3b681e..5f750c6 100644 --- a/tomcat_bec/devices/aerotech/test/autoHilTest.py +++ b/tomcat_bec/devices/aerotech/test/autoHilTest.py @@ -11,31 +11,39 @@ Created on Wed Sep 04 13:21:33 2024 @author: mohacsi_i """ - -import os -import numpy as np import unittest -from time import sleep, time +import time +import os +import jinja2 +import numpy as np from ophyd import EpicsMotor, EpicsSignal -os.environ["EPICS_CA_ADDR_LIST"] = "129.129.106.255" +from tomcat_bec.devices.aerotech import ( + aa1Controller, + aa1GlobalVariables, + aa1GlobalVariableBindings, + aa1AxisIo, + aa1Tasks, + aa1AxisDriveDataCollection, + aa1AxisPsoDistance, +) -from tomcat_bec.devices.aerotech.AerotechAutomation1 import ( - aa1Controller, aa1Tasks, aa1GlobalVariables, aa1AxisIo, aa1AxisDriveDataCollection, - aa1GlobalVariableBindings, aa1AxisPsoDistance, aa1AxisPsoWindow - ) from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import ( - DriveDataCaptureInput, DriveDataCaptureTrigger - ) - + DriveDataCaptureInput, + DriveDataCaptureTrigger, +) +os.environ["EPICS_CA_ADDR_LIST"] = "129.129.106.255" AA1_IOC_NAME = "X02DA-ES1-SMP1" AA1_AXIS_NAME = "ROTY" +# pylint: disable=protected-access class AerotechAutomation1Test(unittest.TestCase): + """Main test class""" + _isConnected = False @classmethod @@ -46,6 +54,7 @@ class AerotechAutomation1Test(unittest.TestCase): cls._isConnected = True def test_01_ControllerProxy(self): + """Test the basic iSMC client""" # Throws if IOC is not available or PVs are incorrect dev = aa1Controller(AA1_IOC_NAME + ":CTRL:", name="aa1") dev.wait_for_connection() @@ -58,117 +67,132 @@ class AerotechAutomation1Test(unittest.TestCase): self.assertTrue(dev.taskcount.get() > 0, "Maximum controller task count seems invalid") print("CTRL: Done testing CTRL module!") - def test_02_TaskProxy(self): - """ Testing the TASK interface - """ + def test_02_task_proxy(self): + """Testing the TASK interface""" # Throws if IOC is not available or PVs are incorrect dev = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") dev.wait_for_connection() - print("\nTASK: Testing the 'execute' command (direct string execution)") - - # Attempt to run a short test script using the Execute interface (real) - text = "$rreturn[0]=3.141592" - rbv = dev.execute(text, mode="Double", taskindex=3) - - self.assertTrue(dev._executeMode.get() in ["Double", 3], - f"Expecting double execution return type, got: {dev._executeMode.get()}") - self.assertTrue(dev.taskIndex.get() == 3, "Execution must happen on task 3") - self.assertTrue(rbv == "Result (double): 3.141592", - f"Unexpected reply from execution: {rbv}") - - # Attempt to run a short test script using the Execute interface (integer) - text = "$ireturn[0]=42" - rbv = dev.execute(text, mode="Int", taskindex=4) - self.assertTrue(dev.taskIndex.get() == 4, "Execution must happen on task 1") - self.assertTrue(rbv == "Result (int): 42", f"Unexpected reply from execution: {rbv}") - - # Attempt a non-existent function on the execute interface - print("TASK: Testing a simple motion command execution") - text = """ - var $fDist as real = 90 - var $fVelo as real = 20 - var $axis as axis = ROTY - MoveRelative($axis, $fDist, $fVelo) - """ - with self.assertRaises(RuntimeError): - rbv = dev.execute(text, taskindex=4) - - # Attempt to run a short motion command using the Execute interface - text = """ - var $fDist as real = 90 - var $fVelo as real = 20 - var $axis as axis = ROTY - Enable($axis) - MoveLinear($axis, $fDist, $fVelo) - """ - t_start = time() - rbv = dev.execute(text, taskindex=4) - t_end = time() - t_elapsed = t_end-t_start - self.assertTrue(t_elapsed > 4, f"Expected 4+ seconds for motion, took {t_elapsed}") - print("TASK: Testing the 'run' command (run text with intermediary file)") # Send another empty program filename = "unittesting2.ascript" text = "program\nvar $positions[2] as real\nend" - dev.launch_script(filename, taskindex=3, filetext=text) + dev.configure({'script_file': filename, 'script_task': 3, 'script_text': text}) + dev.stage() + dev.kickoff() dev.complete().wait() + dev.unstage() + + print("TASK: Testing exception on syntax error") + # Send another empty program with 'war' instead of 'var' + text = "program\nwar $positions[2] as real\nend" + dev.configure({'script_task': 3, 'script_text': text}) + with self.assertRaises(RuntimeError, msg="Stage should raise on syntax error"): + dev.stage() + dev.unstage() + + # Run a sleep function on the iSMC + print("TASK: Testing a simple motion command execution") + text = """ + var $tSleepSec as real = 5 + Dwell($tSleepSec) + """ + dev.configure({'script_task': 3, 'script_text': text}) + dev.stage() + t_start = time.time() + dev.kickoff() + dev.complete().wait() + t_end = time.time() + t_elapsed = t_end - t_start + self.assertTrue( 4 < t_elapsed < 9, f"A 5 seconds sleep finished outside of window in: {t_elapsed}") + dev.unstage() + + # Attempt to run a short motion command using the Execute interface + text = """ + var $fDist as real = 90 + var $fVelo as real = 200 + var $axis as axis = ROTY + Enable($axis) + MoveLinear($axis, $fDist, $fVelo) + """ + dev.configure({'script_task': 3, 'script_text': text}) + dev.stage() + t_start = time.time() + dev.kickoff() + dev.complete().wait() + t_end = time.time() + t_elapsed = t_end - t_start + self.assertTrue( 0 < t_elapsed < 4, f"Motion finished outside of window in: {t_elapsed}") + dev.unstage() + print("TASK: Done testing TASK module!") - def test_03_GlobalVariables1(self): - """Test the basic read/write global variable interface - """ + def test_03_global_variables(self): + """Test the basic read/write global variable interface""" # Throws if IOC is not available or PVs are incorrect dev = aa1GlobalVariables(AA1_IOC_NAME + ":VAR:", name="gvar") dev.wait_for_connection() print("\nVAR: Checking available memory") - self.assertTrue(dev.num_int.get() >= 32, - "At least 32 integer variables are needed for this test") - self.assertTrue(dev.num_real.get() >= 32, - "At least 32 real variables are needed for this test") - self.assertTrue(dev.num_string.get() >= 32, - "At least 32 string[256] variables are needed for this test") + self.assertTrue( + dev.num_int.get() >= 32, "At least 32 integer variables are needed for this test" + ) + self.assertTrue( + dev.num_real.get() >= 32, "At least 32 real variables are needed for this test" + ) + self.assertTrue( + dev.num_string.get() >= 32, "At least 32 string[256] variables are needed for this test" + ) print("VAR: Setting and checking a few variables") # Use random variables for subsequent runs val_f = 100 * np.random.random() dev.write_float(11, val_f) rb_f = dev.read_float(11) - self.assertEqual(val_f, rb_f, - f"Floating point readback value changed, read {rb_f} instead of {val_f}") + self.assertEqual( + val_f, rb_f, f"Floating point readback value changed, read {rb_f} instead of {val_f}" + ) rb_f = dev.read_float(10, 4) - self.assertEqual(val_f, rb_f[1], - f"FP64 array readback value changed, read {rb_f[1]} instead of {val_f}") + self.assertEqual( + val_f, rb_f[1], f"FP64 array readback value changed, read {rb_f[1]} instead of {val_f}" + ) - val_i = 1+int(100*np.random.random()) + val_i = 1 + int(100 * np.random.random()) dev.write_int(13, val_i) rb_i = dev.read_int(13) - self.assertEqual(val_i, rb_i, - f"Integer readback value changed, read {rb_i} instead of {val_i}") + self.assertEqual( + val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" + ) rb_i = dev.read_int(10, 4) - self.assertEqual(val_i, rb_i[3], - f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}") + self.assertEqual( + val_i, + rb_i[3], + f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", + ) - val_i = 1 + int(100*np.random.random()) + val_i = 1 + int(100 * np.random.random()) dev.write_int(13, val_i) rb_i = dev.read_int(13) - self.assertEqual(val_i, rb_i, - f"Integer readback value changed, read {rb_i} instead of {val_i}") + self.assertEqual( + val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" + ) rb_i = dev.read_int(10, 4) - self.assertEqual(val_i, rb_i[3], - f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}") + self.assertEqual( + val_i, + rb_i[3], + f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", + ) - val_s = np.random.choice(['f', 'o', 'o', 'b', 'a', 'r'], 6) - val_s = ''.join(val_s) + val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) + val_s = "".join(val_s) dev.write_string(7, val_s) rb_s = dev.read_string(7) - self.assertEqual(val_s, rb_s, - f"String readback value changed, read {rb_s} instead of {val_s}") + self.assertEqual( + val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" + ) print("VAR: Done testing VAR module basic functionality!") - def test_04_GlobalVariables2(self): + def test_04_global_bindings(self): """Test the direct global variable bindings together with the task and standard global variable interfaces. """ @@ -182,50 +206,70 @@ class AerotechAutomation1Test(unittest.TestCase): print("\nVAR: Checking the direct global variable bindings with random numbers") # Check reading/writing float variables - val_f = 100*np.random.random() + val_f = 100 * np.random.random() dev.float16.set(val_f, settle_time=0.5).wait() - self.assertEqual(val_f, dev.float16.value, - f"Unexpected floating readback, read {dev.float16.value}, expected {val_f}") + self.assertEqual( + val_f, + dev.float16.value, + f"Unexpected floating readback, read {dev.float16.value}, expected {val_f}", + ) - val_f = 100*np.random.random() - var.write_float(3, val_f, settle_time=0.5) - self.assertEqual(val_f, dev.float3.value, - f"Unexpected floating readback, read {dev.float3.value}, expected {val_f}") + val_f = 100 * np.random.random() + var.write_float(3, val_f, settle_time=1.5) + self.assertEqual( + val_f, + dev.float3.value, + f"Unexpected floating readback, read {dev.float3.value}, expected {val_f}", + ) # Check reading/writing integer variables - val_i = int(100*np.random.random())-50 - var.write_int(3, val_i, settle_time=0.5) - self.assertEqual(val_i, dev.int3.value, - f"Unexpected integer readback, read {dev.int3.value}, expected {val_i}") + val_i = int(100 * np.random.random()) - 50 + var.write_int(3, val_i, settle_time=1.5) + self.assertEqual( + val_i, + dev.int3.value, + f"Unexpected integer readback, read {dev.int3.value}, expected {val_i}", + ) - val_i = 1+int(100*np.random.random()) + val_i = 1 + int(100 * np.random.random()) dev.int8.set(val_i, settle_time=0.5).wait() - self.assertEqual(val_i, dev.int8.value, - f"Unexpected integer readback, read {dev.int8.value}, expected {val_i}") + self.assertEqual( + val_i, + dev.int8.value, + f"Unexpected integer readback, read {dev.int8.value}, expected {val_i}", + ) # Check reading/writing string variables - val_s = np.random.choice(['f', 'o', 'o', 'b', 'a', 'r'], 6) - val_s = ''.join(val_s) + val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) + val_s = "".join(val_s) dev.str4.set(val_s, settle_time=0.5).wait() rb_s = dev.str4.value - self.assertEqual(val_s, rb_s, - f"String readback value changed, read {rb_s} instead of {val_s}") + self.assertEqual( + val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" + ) print("VAR: Done testing VAR module direct bindings!") - # Attempt to run a short test script using the Execute interface (integer) + # Attempt to run a short test script in globals (integer) text = """ $iglobal[1]=420 $rglobal[1]=1.4142 """ - tsk.execute(text, taskindex=4) - self.assertEqual(420, dev.int1.value, - f"Unexpected integer readback , read {dev.int1.value}, expected 420") - self.assertAlmostEqual(1.4142, dev.float1.value, - f"Unexpected floating readback , read {dev.float1.value}, expected 1.4142") + tsk.configure({'script_text': text, 'script_task': 4}) + tsk.arm() + tsk.launch() + self.assertEqual( + 420, + dev.int1.value, + f"Unexpected integer readback , read {dev.int1.value}, expected 420", + ) + self.assertAlmostEqual( + 1.4142, + dev.float1.value, + f"Unexpected floating readback , read {dev.float1.value}, expected 1.4142", + ) - def test_05_MotorRecord(self): - """ Tests the basic move functionality of the MotorRecord - """ + def test_05_axis_motorrecord(self): + """Tests the basic move functionality of the MotorRecord""" mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") mot.wait_for_connection() print("\nMR: Checking the standard EPICS motor record") @@ -235,8 +279,10 @@ class AerotechAutomation1Test(unittest.TestCase): target = mot.position + dist print(f"Absolute move to {target}, distance: {dist}") mot.move(target, wait=True) - self.assertTrue(np.abs(target-mot.position) < 0.1, - f"Final position {mot.position} is too far from target {target}") + self.assertTrue( + np.abs(target - mot.position) < 0.1, + f"Final position {mot.position} is too far from target {target}", + ) RelMove(30 + 50.0 * np.random.random()) RelMove(20 + 50.0 * np.random.random()) @@ -245,29 +291,30 @@ class AerotechAutomation1Test(unittest.TestCase): RelMove(10 + 50.0 * np.random.random()) print("MR: Done testing MotorRecord!") - def test_06_AxisIo(self): + def test_06_axis_io(self): + """Test axis IO bindings""" # Throws if IOC is not available or PVs are incorrect dev = aa1AxisIo(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":IO:", name="io") dev.wait_for_connection() print("\nAX:IO: Checking axis IO") # Writing random values to analog/digital outputs - for ii in range(5): + for _ in range(5): val_f = np.random.random() dev.set_analog(0, val_f) self.assertAlmostEqual(dev.ao.get(), val_f, 2, "Unexpected readback on analog output") self.assertAlmostEqual(dev.ao0.get(), val_f, 2, "Unexpected readback on analog output") print("Digital") - for ii in range(5): + for _ in range(5): val_b = round(np.random.random()) dev.set_digital(0, val_b) self.assertEqual(dev.do.get(), val_b, "Unexpected readback on digital output") self.assertEqual(dev.do0.get(), val_b, "Unexpected readback on digital output") print("IO: Done testing IO module!") - def test_07_AxisDdc(self): - """ Drive data collection + def test_07_axis_ddc(self): + """Drive data collection This is the preliminary test case dor drive data collection using internal PSO based triggers. It tests basic functionality, like @@ -275,96 +322,129 @@ class AerotechAutomation1Test(unittest.TestCase): """ # Throws if IOC is not available or PVs are incorrect dev = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - event = EpicsSignal(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:EVENT:SINGLE", put_complete=True) + event = EpicsSignal( + AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:EVENT:SINGLE", put_complete=True + ) dev.wait_for_connection() event.wait_for_connection() print("\nAX:DDC: Checking axis DDC") # Stop any running acquisition - dev.unstage(settle_time=0.1) + dev.unstage() print("AX:DDC: Running a partial acquisition (1/3)") # Configure the DDC for 42 points and start waiting for triggers - cfg = {'ntotal': 42, 'trigger': DriveDataCaptureTrigger.PsoEvent} + cfg = {"num_points_total": 42, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} dev.configure(cfg) dev.stage() # Fire 12 triggers and stop the acquisition early - for ii in range(12): - event.set(1, settle_time=0.05).wait() + for _ in range(12): + event.set(1, settle_time=0.1).wait() # Stop the data acquisition early dev.unstage() # Wait for polling to catch up - sleep(1) + time.sleep(1) # Do the readback and check final state rbv = dev.collect() - self.assertEqual(dev.nsamples_rbv.value, 12, - f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}") - self.assertEqual(len(rbv['data']["buffer0"]), 12, - f"Expected to read back 12 points by now, got {len(rbv['data']['buffer0'])}") - self.assertEqual(len(rbv['data']["buffer1"]), 12, - f"Expected to read back 12 points by now, got {len(rbv['data']['buffer1'])}") + self.assertEqual( + dev.nsamples_rbv.value, + 12, + f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 12, + f"Expected to read back 12 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 12, + f"Expected to read back 12 points by now, got {len(rbv['data']['buffer1'])}", + ) print("AX:DDC: Running an overruning acquisition (2/3)") # Configure the DDC for 16 points and start waiting for triggers - cfg = {'ntotal': 16, 'trigger': DriveDataCaptureTrigger.PsoEvent} + cfg = {"num_points_total": 16, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} dev.configure(cfg) dev.stage() num = dev.nsamples_rbv.get() - self.assertEqual(num, 0, - f"Stage should reset the number of DDC points but got: {num}") + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") # Fire 20 triggers and stop the acquisition early - for ii in range(20): - event.set(1, settle_time=0.05).wait() + for _ in range(20): + event.set(1, settle_time=0.05).wait() dev.unstage() # Wait for polling to catch up - sleep(1) + time.sleep(1) # Do the readback and check final state rbv = dev.collect() - self.assertEqual(dev.nsamples_rbv.value, 16, - f"Expected to collect 16 points by now, got {dev.nsamples_rbv.value}") - self.assertEqual(len(rbv['data']["buffer0"]), 16, - f"Expected to read back 16 points by now, got {len(rbv['data']['buffer0'])}") - self.assertEqual(len(rbv['data']["buffer1"]), 16, - f"Expected to read back 16 points by now, got {len(rbv['data']['buffer1'])}") + self.assertEqual( + dev.nsamples_rbv.value, + 16, + f"Expected to collect 16 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 16, + f"Expected to read back 16 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 16, + f"Expected to read back 16 points by now, got {len(rbv['data']['buffer1'])}", + ) print("AX:DDC: Trying to record some noise on the analog input (3/3)") - cfg = {'ntotal': 36, 'trigger': DriveDataCaptureTrigger.PsoEvent, 'source1': DriveDataCaptureInput.AnalogInput0} + cfg = { + "num_points_total": 36, + "ddc_trigger": DriveDataCaptureTrigger.PsoEvent, + "ddc_source1": DriveDataCaptureInput.AnalogInput0, + } dev.configure(cfg) dev.stage() num = dev.nsamples_rbv.get() - self.assertEqual(num, 0, - f"Stage should reset the number of DDC points but got: {num}") + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") # Fire 36 triggers for ii in range(36): - event.set(1, settle_time=0.05).wait() + event.set(1, settle_time=0.05).wait() dev.unstage() # Wait for polling to catch up - sleep(1) + time.sleep(1) # Do the readback and check final state rbv = dev.collect() - self.assertEqual(dev.nsamples_rbv.value, 36, - f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}") - self.assertEqual(len(rbv['data']["buffer0"]), 36, - f"Expected to read back 36 points by now, got {len(rbv['data']['buffer0'])}") - self.assertEqual(len(rbv['data']["buffer1"]), 36, - f"Expected to read back 36 points by now, got {len(rbv['data']['buffer1'])}") - self.assertTrue(np.std(rbv['data']["buffer1"]) > 0.000001, - f"Expected some noise on analog input, but std is: {np.std(rbv['data']['buffer1'])}") + self.assertEqual( + dev.nsamples_rbv.value, + 36, + f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 36, + f"Expected to read back 36 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 36, + f"Expected to read back 36 points by now, got {len(rbv['data']['buffer1'])}", + ) + self.assertTrue( + np.std(rbv["data"]["buffer1"]) > 0.000001, + f"Expected some noise on analog input, but std is: {np.std(rbv['data']['buffer1'])}", + ) print("AX:DDCal(l: Done testing DDC module!") - def test_08_AxisPsoDistance1(self): - """ Test basic PSO distance mode functionality + def test_08_axis_pso_distance1(self): + """Test basic PSO distance mode functionality - This module tests basic PSO distance functionality, like events and - waveform generation. The goal is to ensure the basic operation of - the PSO module and the polled feedback. + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. """ # Throws if IOC is not available or PVs are incorrect pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") @@ -380,76 +460,88 @@ class AerotechAutomation1Test(unittest.TestCase): print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)") # Configure steps and move to middle of range - pso.configure({'distance': 5, 'wmode': "toggle"}) + pso.configure({"pso_distance": 5, "pso_wavemode": "toggle"}) pso.kickoff() mot.move(mot.position + 2.5) # Start moving in steps, while checking output for ii in range(10): last_pso = pso.output.value mot.move(mot.position + 5) - sleep(1) - self.assertTrue(pso.output.value != last_pso, - f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}") + time.sleep(1) + self.assertTrue( + pso.output.value != last_pso, + f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}", + ) pso.unstage() ddc.unstage() print("AX:PSO: Checking basic 'toggled' event generation with DDC (2/3)") # Configure steps and move to middle of range - pso.configure({'distance': 2, 'wmode': "toggle"}) + pso.configure({"pso_distance": 2, "pso_wavemode": "toggle"}) pso.kickoff() # Configure DDC npoints = 720 / 2.0 - cfg = {'ntotal': npoints+100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": npoints + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) ddc.stage() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling mot.move(mot.position + 720) - sleep(0.5) + time.sleep(0.5) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-720/4) <= 1, - f"Expected to record {720/4} points but got {npoints_rbv}") + self.assertTrue( + np.abs(npoints_rbv - 720 / 4) <= 1, + f"Expected to record {720/4} points but got {npoints_rbv}", + ) pso.unstage() ddc.unstage() print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/5)") # Configure steps and move to middle of range - pso.configure({'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 3}) + pso.configure({"pso_distance": 1.8, "pso_wavemode": "pulsed", "pso_n_pulse": 3}) pso.kickoff() # Configure DDC npoints = 3 * 720 / 1.8 - cfg = {'ntotal': npoints+100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": npoints + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) ddc.stage() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling mot.move(mot.position + 720) - sleep(0.5) + time.sleep(0.5) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-3*720/1.8) <= 1, - f"Expected to record {3*720/1.8} points but got {npoints_rbv}") + self.assertTrue( + np.abs(npoints_rbv - 3 * 720 / 1.8) <= 1, + f"Expected to record {3*720/1.8} points but got {npoints_rbv}", + ) print("AX:PSO: Done testing basic PSO distance functionality!") pso.unstage() ddc.unstage() - def test_09_PsoIntegration01(self): - """ The first integration test aims to verify PSO functionality under - real life scenarios. It uses the DDC for collecting encoder signals - and compares them to the expected values. + def test_09_axis_pso_integration01(self): + """The first integration test aims to verify PSO functionality under + real life scenarios. It uses the DDC for collecting encoder signals + and compares them to the expected values. """ # Throws if IOC is not available or PVs are incorrect pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") @@ -461,37 +553,44 @@ class AerotechAutomation1Test(unittest.TestCase): pso.unstage() ddc.unstage() mot.unstage() - print("\ntest_09_PsoIntegration01: Integration test for vector style PSO triggering (advanced)") + print( + "\ntest_09_PsoIntegration01: Integration test for vector PSO triggering (advanced)" + ) print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/3)") # This is one line of the Tomcat sequence scan! acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value vec_dist = [acc_dist, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18] - pso.configure({'distance': vec_dist, 'wmode': "toggle"}) + pso.configure({"pso_distance": vec_dist, "pso_wavemode": "toggle"}) pso.kickoff() # Configure DDC npoints_exp = len(vec_dist) / 2 - cfg = {'ntotal': npoints_exp + 100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) ddc.stage() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling mot.move(mot.position + np.sum(vec_dist) + 10) - sleep(0.9) + time.sleep(1) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1, - f"Expected to record {npoints_exp} points but got {npoints_rbv}") - self.assertTrue(pso.output.value == 0, - "Expected to finish in LOW state") - self.assertTrue(pso.dstArrayDepleted.value, - "Expected to cover all points in the distance array") + self.assertTrue( + np.abs(npoints_rbv - npoints_exp) < 1, + f"Expected to record {npoints_exp} points but got {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") + self.assertTrue( + pso.dstArrayDepleted.value, "Expected to cover all points in the distance array" + ) pso.unstage() ddc.unstage() @@ -499,136 +598,142 @@ class AerotechAutomation1Test(unittest.TestCase): # This is practically golden ratio tomography! acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value vec_dist = [acc_dist] - vec_dist.extend([1.618]*100) - pso.configure({'distance': vec_dist, 'wmode': "pulsed"}) + vec_dist.extend([1.618] * 100) + pso.configure({"pso_distance": vec_dist, "pso_wavemode": "pulsed"}) pso.kickoff() # Configure DDC npoints_exp = len(vec_dist) - cfg = {'ntotal': npoints_exp + 100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) ddc.stage() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling (move negative direction) mot.move(mot.position - np.sum(vec_dist) - acc_dist) - sleep(0.9) + time.sleep(1) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1, - f"Expected to record {npoints_exp} points but got {npoints_rbv}") + self.assertTrue( + np.abs(npoints_rbv - npoints_exp) < 1, + f"Expected to record {npoints_exp} points but got {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") - self.assertTrue(pso.dstArrayDepleted.value, - "Expected to cover all pints in the distance array") + self.assertTrue( + pso.dstArrayDepleted.value, "Expected to cover all pints in the distance array" + ) print("AX:PSO: Done testing PSO module in distance mode!") pso.unstage() ddc.unstage() - def test_10_AxisPsoWindow1(self): - """ Test basic PSO window mode functionality + # def test_10_AxisPsoWindow1(self): + # """ Test basic PSO window mode functionality - This module tests basic PSO distance functionality, like events and - waveform generation. The goal is to ensure the basic operation of - the PSO module and the polled feedback. - """ - # Throws if IOC is not available or PVs are incorrect - pso = aa1AxisPsoWindow(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") - mot.wait_for_connection() - pso.wait_for_connection() - ddc.wait_for_connection() - pso.unstage() - ddc.unstage() - mot.unstage() - print("\nAX:PSO: Checking axis PSO in window mode (basic)") + # This module tests basic PSO distance functionality, like events and + # waveform generation. The goal is to ensure the basic operation of + # the PSO module and the polled feedback. + # """ + # # Throws if IOC is not available or PVs are incorrect + # pso = aa1AxisPsoWindow(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + # ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + # mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + # mot.wait_for_connection() + # pso.wait_for_connection() + # ddc.wait_for_connection() + # pso.unstage() + # ddc.unstage() + # mot.unstage() + # print("\nAX:PSO: Checking axis PSO in window mode (basic)") - print("AX:PSO: Directly checking basic window output (1/3)") - # Configure steps and move to middle of range - pso.configure({'bounds': (5, 13), 'wmode': "output"}) - pso.kickoff() - sleep(1) - # Move half a step - mot.move(mot.position + 0.1) - dist = 0.1 + # print("AX:PSO: Directly checking basic window output (1/3)") + # # Configure steps and move to middle of range + # pso.configure({'bounds': (5, 13), 'pso_wavemode': "output"}) + # pso.kickoff() + # time.sleep(1) + # # Move half a step + # mot.move(mot.position + 0.1) + # dist = 0.1 - # Start moving in steps, while checking output - for ii in range(42): - mot.move(mot.position + 0.2) - dist += 0.2 - sleep(1) - if 5 < dist < 13: - self.assertTrue(pso.output.value == 1, - f"Expected HIGH PSO output inside the window at distance {dist}") - else: - self.assertTrue(pso.output.value == 0, - f"Expected LOW PSO output outside the window at distance {dist}") + # # Start moving in steps, while checking output + # for ii in range(42): + # mot.move(mot.position + 0.2) + # dist += 0.2 + # time.sleep(1) + # if 5 < dist < 13: + # self.assertTrue(pso.output.value == 1, + # f"Expected HIGH PSO output inside the window at distance {dist}") + # else: + # self.assertTrue(pso.output.value == 0, + # f"Expected LOW PSO output outside the window at distance {dist}") - print("AX:PSO: The next tests fail due to encoder jitter!") - pso.unstage() - return + # print("AX:PSO: The next tests fail due to encoder jitter!") + # pso.unstage() + # return - print("AX:PSO: Checking basic window event generation with DDC (2/3)") - # Move to acceleration position - acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value - print(f"Acceleration distance: {acc_dist}") - start_position = mot.position - mot.move(start_position - acc_dist) - # Configure PSO in window mode - cfg = {'bounds': (acc_dist, 30 + acc_dist), 'wmode': "pulsed", "emode": 'Both'} - dev.configure(cfg) - # Configure the data capture (must be performed in start position) - cfg = {'ntotal': 5*10, 'trigger': DriveDataCaptureTrigger.PsoOutput} - ddc.configure(cfg) - ddc.stage() - npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - # Repeated line scan - for ii in range(10): - dev.stage() - mot.move(start_position + 30 + acc_dist) - dev.unstage() - mot.move(start_position - acc_dist) - sleep(0.5) - npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv - 50) < 1, - f"Expected to record 10 points but got {npoints_rbv}") + # print("AX:PSO: Checking basic window event generation with DDC (2/3)") + # # Move to acceleration position + # acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + # print(f"Acceleration distance: {acc_dist}") + # start_position = mot.position + # mot.move(start_position - acc_dist) + # # Configure PSO in window mode + # cfg = {'bounds': (acc_dist, 30 + acc_dist), 'pso_wavemode': "pulsed", "emode": 'Both'} + # dev.configure(cfg) + # # Configure the data capture (must be performed in start position) + # cfg = {'ntotal': 5*10, 'trigger': DriveDataCaptureTrigger.PsoOutput} + # ddc.configure(cfg) + # ddc.stage() + # npoints_rbv = ddc.nsamples_rbv.value + # self.assertEqual(npoints_rbv, 0, + # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + # # Repeated line scan + # for ii in range(10): + # dev.stage() + # mot.move(start_position + 30 + acc_dist) + # dev.unstage() + # mot.move(start_position - acc_dist) + # time.sleep(0.5) + # npoints_rbv = ddc.nsamples_rbv.value + # self.assertTrue(np.abs(npoints_rbv - 50) < 1, + # f"Expected to record 10 points but got {npoints_rbv}") - print("AX:PSO: Checking basic 'toggled' event generation with DDC (3/3)") - print("Test removed as there's some heavy jitter on the PSO output flag") - # Move to acceleration position - acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value - print(f"Acceleration distance: {acc_dist}") - mot.move(mot.position - acc_dist) - # Configure PSO in window mode - cfg = {'bounds': (acc_dist, 30 + acc_dist + 1), 'wmode': "output"} - dev.configure(cfg) - dev.stage() - # Configure the data capture - cfg = {'ntotal': 10, 'trigger': DriveDataCaptureTrigger.PsoOutput} - ddc.configure(cfg) - ddc.stage() - npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - # Snake scan - for ii in range(10): - if ii % 2 == 0: - mot.move(mot.position + 30 + 2 * acc_dist + 0.1) - else: - mot.move(mot.position - 30 - 2 * acc_dist - 0.1) - sleep(0.5) - npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv - 10) < 1, - f"Expected to record 10 points but got {npoints_rbv}") + # print("AX:PSO: Checking basic 'toggled' event generation with DDC (3/3)") + # print("Test removed as there's some heavy jitter on the PSO output flag") + # # Move to acceleration position + # acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + # print(f"Acceleration distance: {acc_dist}") + # mot.move(mot.position - acc_dist) + # # Configure PSO in window mode + # cfg = {'bounds': (acc_dist, 30 + acc_dist + 1), 'pso_wavemode': "output"} + # dev.configure(cfg) + # dev.stage() + # # Configure the data capture + # cfg = {'ntotal': 10, 'trigger': DriveDataCaptureTrigger.PsoOutput} + # ddc.configure(cfg) + # ddc.stage() + # npoints_rbv = ddc.nsamples_rbv.value + # self.assertEqual(npoints_rbv, 0, + # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + # # Snake scan + # for ii in range(10): + # if ii % 2 == 0: + # mot.move(mot.position + 30 + 2 * acc_dist + 0.1) + # else: + # mot.move(mot.position - 30 - 2 * acc_dist - 0.1) + # time.sleep(0.5) + # npoints_rbv = ddc.nsamples_rbv.value + # self.assertTrue(np.abs(npoints_rbv - 10) < 1, + # f"Expected to record 10 points but got {npoints_rbv}") def test_12_StepScan(self): - """ Test a simple blusky style step scan (via BEC) + """Test a simple blusky style step scan (via BEC) This module tests manual PSO trigger functionality. The goal is to ensure the basic operation of the PSO module and the polled feedback. @@ -643,26 +748,28 @@ class AerotechAutomation1Test(unittest.TestCase): pso.unstage() ddc.unstage() mot.unstage() - t_start = time() + t_start = time.time() print("\ntest_12_StepScan: Simple step scan (via Ophyd)") - PosStart = 112.0 - ScanRange = 180 - ScanSteps = 18 + pos_start = 112.0 + scan_range = 180 + scan_steps = 18 - step_size = ScanRange / ScanSteps + step_size = scan_range / scan_steps positions = [] - for ii in range(ScanSteps+1): - positions.append(PosStart + ii * step_size) + for ii in range(scan_steps + 1): + positions.append(pos_start + ii * step_size) # Motor setup - mot.move(PosStart) - self.assertTrue(np.abs(mot.position-PosStart) < 0.1, - f"Expected to move to scan start position {PosStart}, now at {mot.position}") + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 0.1, + f"Expected to move to scan start position {pos_start}, now at {mot.position}", + ) # PSO setup - pso.configure(d={'n_pulse': 2}) + pso.configure(d={"n_pulse": 2}) # DDC setup (trigger at each point plus some extra) - cfg = {'ntotal': 10 * (ScanSteps + 1), 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": 10 * (scan_steps + 1), "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) print("\tStage") @@ -672,8 +779,10 @@ class AerotechAutomation1Test(unittest.TestCase): for ii in range(len(positions)): mot.move(positions[ii]) - self.assertTrue(np.abs(mot.position-positions[ii]) < 1.0, - f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}") + self.assertTrue( + np.abs(mot.position - positions[ii]) < 1.0, + f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}", + ) pso.trigger() print("\tUnstage") @@ -681,22 +790,25 @@ class AerotechAutomation1Test(unittest.TestCase): ddc.unstage() pso.unstage() - t_end = time() + t_end = time.time() t_elapsed = t_end - t_start print(f"Elapsed scan time: {t_elapsed}") print("Evaluate final state") - sleep(0.5) + time.sleep(0.5) npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 2 * len(positions), - f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 2 * len(positions), + f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}", + ) def test_13_TomcatSequenceScan(self): - """ Test the zig-zag sequence scan from Tomcat (via BEC) + """Test the zig-zag sequence scan from Tomcat (via BEC) - This module tests basic PSO distance functionality, like events and - waveform generation. The goal is to ensure the basic operation of - the PSO module and the polled feedback. + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. """ # Throws if IOC is not available or PVs are incorrect pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") @@ -708,46 +820,48 @@ class AerotechAutomation1Test(unittest.TestCase): pso.unstage() ddc.unstage() mot.unstage() - t_start = time() + t_start = time.time() print("\ntest_13_TomcatSequenceScan: TOMCAT Sequeence scan (via Ophyd)") - ScanStart = 42.0 - ScanRange = 180 - NumRepeat = 10 - ScanType = "PosNeg" + scan_start = 42.0 + scan_range = 180 + num_repeat = 10 + scan_type = "PosNeg" # Dynamic properties - VelScan = 90 - AccScan = 500 - SafeDist = 10.0 + scan_vel = 90 + scan_acc = 500 + dist_safe = 10.0 ###################################################################### print("\tConfigure") # Scan range - AccDist = 0.5 * VelScan * VelScan / AccScan + SafeDist - if ScanType in ["PosNeg", "Pos"]: - PosStart = ScanStart - AccDist - PosEnd = ScanStart + ScanRange + AccDist - elif ScanType in ["NegPos", "Neg"]: - PosStart = ScanStart + AccDist - PosEnd = ScanStart - ScanRange - AccDist + dist_acc = 0.5 * scan_vel * scan_vel / scan_acc + dist_safe + if scan_type in ["PosNeg", "Pos"]: + pos_start = scan_start - dist_acc + pos_end = scan_start + scan_range + dist_acc + elif scan_type in ["NegPos", "Neg"]: + pos_start = scan_start + dist_acc + pos_end = scan_start - scan_range - dist_acc else: - raise RuntimeError(f"Unexpected ScanType: {ScanType}") + raise RuntimeError(f"Unexpected ScanType: {scan_type}") # Motor setup - mot.velocity.set(VelScan).wait() - mot.acceleration.set(VelScan/AccScan).wait() - mot.move(PosStart) - self.assertTrue(np.abs(mot.position-PosStart) < 0.1, - f"Expected to move to scan start position {PosStart}, now at {mot.position}") + mot.velocity.set(scan_vel).wait() + mot.acceleration.set(scan_vel / scan_acc).wait() + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 0.1, + f"Expected to move to scan start position {pos_start}, now at {mot.position}", + ) # PSO setup - print(f"Configuring PSO to: {[AccDist, ScanRange]}") - cfg = {'distance': [AccDist, ScanRange], 'wmode': "toggle"} + print(f"Configuring PSO to: {[dist_acc, scan_range]}") + cfg = {"pso_distance": [dist_acc, scan_range], "pso_wavemode": "toggle"} pso.configure(cfg) # DDC setup (Tomcat runs in ENABLE mode, so only one posedge) - cfg = {'ntotal': NumRepeat, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": num_repeat, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) print("\tStage") @@ -757,89 +871,110 @@ class AerotechAutomation1Test(unittest.TestCase): print("\tKickoff") pso.kickoff() - for ii in range(NumRepeat): + for ii in range(num_repeat): # No option to reset the index counter... pso.dstArrayRearm.set(1).wait() - if ScanType in ["Pos", "Neg"]: - mot.move(PosEnd) - self.assertTrue(np.abs(mot.position-PosEnd) < 1.0, - f"Expected to reach {PosEnd}, now is at {mot.position} on iteration {ii}") - mot.move(PosStart) - self.assertTrue(np.abs(mot.position-PosStart) < 1.0, - f"Expected to rewind to {PosStart}, now at {mot.position} on iteration {ii}") + if scan_type in ["Pos", "Neg"]: + mot.move(pos_end) + self.assertTrue( + np.abs(mot.position - pos_end) < 1.0, + f"Expected to reach {pos_end}, now is at {mot.position} on iteration {ii}", + ) + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 1.0, + f"Expected to rewind to {pos_start}, now at {mot.position} on iteration {ii}", + ) - if ScanType in ["PosNeg", "NegPos"]: + if scan_type in ["PosNeg", "NegPos"]: if ii % 2 == 0: - mot.move(PosEnd) - self.assertTrue(np.abs(mot.position-PosEnd) < 1.0, - f"Expected to reach {PosEnd}, now at {mot.position} on iteration {ii}") + mot.move(pos_end) + self.assertTrue( + np.abs(mot.position - pos_end) < 1.0, + f"Expected to reach {pos_end}, now at {mot.position} on iteration {ii}", + ) else: - mot.move(PosStart) - self.assertAlmostEqual(mot.position, PosStart, 1, - f"Expected to reach {PosStart}, now is at {mot.position} on iteration {ii}") + mot.move(pos_start) + self.assertAlmostEqual( + mot.position, + pos_start, + 1, + f"Expected to reach {pos_start}, now is at {mot.position} on iteration {ii}", + ) - sleep(0.2) + time.sleep(0.2) npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, ii+1, - f"Expected to record {ii+1} DDC points, got: {npoints_rbv} on iteration {ii}") + self.assertEqual( + npoints_rbv, + ii + 1, + f"Expected to record {ii+1} DDC points, got: {npoints_rbv} on iteration {ii}", + ) print("\tUnstage") mot.unstage() ddc.unstage() pso.unstage() - t_end = time() + t_end = time.time() t_elapsed = t_end - t_start print(f"Elapsed scan time: {t_elapsed}") print("Evaluate final state") - sleep(3) + time.sleep(3) npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, NumRepeat, - f"Expected to record {NumRepeat} DDC points, but got: {npoints_rbv}") - + self.assertEqual( + npoints_rbv, + num_repeat, + f"Expected to record {num_repeat} DDC points, but got: {npoints_rbv}", + ) def test_14_JinjaTomcatSequenceScan(self): + """ Run a test sequence scan with template substitution""" # Load the test file try: filename = "test/testSequenceScanTemplate.ascript" - with open(filename) as f: + with open(filename, encoding="utf-8") as f: templatetext = f.read() except FileNotFoundError: filename = "testSequenceScanTemplate.ascript" - with open(filename) as f: + with open(filename, encoding="utf-8") as f: templatetext = f.read() - import jinja2 - scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 13, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500} + scanconfig = { + "startpos": 42, + "scanrange": 180, + "nrepeat": 13, + "scandir": "NegPos", + "scanvel": 90, + "scanacc": 500, + } tm = jinja2.Template(templatetext) text = tm.render(scan=scanconfig) # with open("test/tcatSequenceScan.ascript", "w") as text_file: # text_file.write(text) # Throws if IOC is not available or PVs are incorrect - dev = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") + dev = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") dev.wait_for_connection() print("\nTOMCAT Sequeence scan (via Jinjad AeroScript)") # Copy file to controller and run it - t_start = time() + t_start = time.time() dev.launch_script("tcatSequenceScan.ascript", taskindex=4, filetext=text) - sleep(0.5) + time.sleep(0.5) dev.complete().wait() - t_end = time() + t_end = time.time() t_elapsed = t_end - t_start print(f"Elapsed scan time: {t_elapsed}") - sleep(0.5) + time.sleep(0.5) ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") ddc.wait_for_connection() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 13, - f"Expected to record 13 DDC points, got: {npoints_rbv} after script") - - + self.assertEqual( + npoints_rbv, 13, f"Expected to record 13 DDC points, got: {npoints_rbv} after script" + ) if __name__ == "__main__":