diff --git a/tomcat_bec/devices/aerotech/AerotechPso.py b/tomcat_bec/devices/aerotech/AerotechPso.py index 55e00df..3c3d8b6 100644 --- a/tomcat_bec/devices/aerotech/AerotechPso.py +++ b/tomcat_bec/devices/aerotech/AerotechPso.py @@ -39,6 +39,7 @@ class AerotechPsoBase(PSIDeviceBase, Device): output = Component(EpicsSignalRO, "OUTPUT-RBV", auto_monitor=True, kind=Kind.normal) address = Component(EpicsSignalRO, "ARRAY-ADDR", kind=Kind.config) _eventSingle = Component(EpicsSignal, "EVENT:SINGLE", put_complete=True, kind=Kind.omitted) + switch = Component(EpicsSignal, "SWITCH", put_complete=True, kind=Kind.omitted) # ######################################################################## # PSO Distance event module @@ -138,6 +139,8 @@ class AerotechPsoBase(PSIDeviceBase, Device): w_pulse = d.get("pso_w_pulse", 200) n_pulse = d.get("pso_n_pulse", 1) + self.switch.set("Manual").wait() + # Configure the pulsed/toggled waveform if wmode in ["toggle", "toggled"]: # Switching to simple toggle mode @@ -160,12 +163,6 @@ class AerotechPsoBase(PSIDeviceBase, Device): else: raise RuntimeError(f"Unsupported window mode: {wmode}") - # Set PSO output data source - # FIXME : This is essentially staging... - if wmode in ["toggle", "toggled", "pulse", "pulsed"]: - self.outSource.set("Waveform").wait() - elif wmode in ["output", "flag"]: - self.outSource.set("Window").wait() class aa1AxisPsoDistance(AerotechPsoBase): @@ -226,10 +223,6 @@ class aa1AxisPsoDistance(AerotechPsoBase): raise RuntimeError(f"Unsupported distace triggering mode: {pso_wavemode}") old = self.read_configuration() - # Disable everything - self.winEvents.set("Off").wait() - self.dstCounterEna.set("Off").wait() - self.dstEventsEna.set("Off").wait() # Configure distance generator (also resets counter to 0) self._distance_value = pso_distance @@ -291,29 +284,21 @@ class aa1AxisPsoDistance(AerotechPsoBase): return self.fire(settle_time) def arm(self) -> None: - """Bluesky style stage""" + """Bluesky style stage + + It re-arms the distance array and re-sets the distance counter at current position + """ # Stage the PSO distance module and zero counter if isinstance(self._distance_value, (np.ndarray, list, tuple)): self.dstArrayRearm.set(1).wait() - # Wait for polling - sleep(0.5) - # Start monitoring the counters if distance is valid - if self.dstDistanceVal.get() > 0: - self.dstEventsEna.set("On").wait() - self.dstCounterEna.set("On").wait() + # Set distance and wait for polling + self.switch.set("Distance", settle_time=0.2).wait() def disarm(self): """Standard bluesky unstage""" - # Ensure output is set to low - # if self.output.value: - # self.toggle() - # Turn off window mode - self.winOutput.set("Off").wait() - self.winEvents.set("Off").wait() - # Turn off distance mode - self.dstEventsEna.set("Off").wait() - self.dstCounterEna.set("Off").wait() - # Disable output - self.outSource.set("None").wait() - # Sleep for one poll period - sleep(0.2) + self.switch.set("Manual", settle_time=0.2).wait() + + def launch(self): + """Re-set the counters""" + if isinstance(self._distance_value, (np.ndarray, list, tuple)): + self.dstArrayRearm.set(1).wait() \ No newline at end of file diff --git a/tomcat_bec/devices/aerotech/test/autoHilTest.py b/tomcat_bec/devices/aerotech/test/autoHilTest.py index a616bfe..d976d0f 100644 --- a/tomcat_bec/devices/aerotech/test/autoHilTest.py +++ b/tomcat_bec/devices/aerotech/test/autoHilTest.py @@ -83,7 +83,7 @@ class AerotechAutomation1Test(unittest.TestCase): filename = "unittesting2.ascript" text = "program\nvar $positions[4] as real = [2, 4, 8, 16]\nend" dev.configure({'script_file': filename, 'script_task': 3, 'script_text': text}) - dev.stage() + dev.arm() dev.kickoff() dev.complete().wait() dev.unstage() @@ -94,7 +94,7 @@ class AerotechAutomation1Test(unittest.TestCase): dev.configure({'script_task': 3, 'script_text': text}) # Should raise when Load with self.assertRaises(RuntimeError): - dev.stage() + dev.arm() dev.unstage() # # Run a sleep function on the iSMC @@ -106,7 +106,7 @@ class AerotechAutomation1Test(unittest.TestCase): $rglobal[4]=8.765 """ dev.configure({'script_task': 3, 'script_text': text}) - dev.stage() + dev.arm() t_start = time.time() dev.launch() dev.complete().wait() @@ -125,7 +125,7 @@ class AerotechAutomation1Test(unittest.TestCase): MoveLinear($axis, $fDist, $fVelo) """ dev.configure({'script_task': 3, 'script_text': text}) - dev.stage() + dev.arm() t_start = time.time() dev.kickoff() dev.complete().wait() @@ -136,514 +136,520 @@ class AerotechAutomation1Test(unittest.TestCase): print("TASK: Done testing TASK module!") - # def test_03_global_variables(self): - # """Test the basic read/write global variable interface""" - # # Throws if IOC is not available or PVs are incorrect - # dev = aa1GlobalVariables(AA1_IOC_NAME + ":VAR:", name="gvar") - # dev.wait_for_connection() + def test_03_global_variables(self): + """Test the basic read/write global variable interface""" + # Throws if IOC is not available or PVs are incorrect + dev = aa1GlobalVariables(AA1_IOC_NAME + ":VAR:", name="gvar") + dev.wait_for_connection() - # print("\nVAR: Checking available memory") - # self.assertTrue( - # dev.num_int.get() >= 32, "At least 32 integer variables are needed for this test" - # ) - # self.assertTrue( - # dev.num_real.get() >= 32, "At least 32 real variables are needed for this test" - # ) - # self.assertTrue( - # dev.num_string.get() >= 8, "At least 8 string[256] variables are needed for this test" - # ) + print("\nVAR: Checking available memory") + self.assertTrue( + dev.num_int.get() >= 32, "At least 32 integer variables are needed for this test" + ) + self.assertTrue( + dev.num_real.get() >= 32, "At least 32 real variables are needed for this test" + ) + self.assertTrue( + dev.num_string.get() >= 8, "At least 8 string[256] variables are needed for this test" + ) - # print("VAR: Setting and checking a few variables") - # # Use random variables for subsequent runs - # val_f = 100 * np.random.random() - # dev.write_float(11, val_f) - # rb_f = dev.read_float(11) - # self.assertEqual( - # val_f, rb_f, f"Floating point readback value changed, read {rb_f} instead of {val_f}" - # ) - # rb_f = dev.read_float(10, 4) - # self.assertEqual( - # val_f, rb_f[1], f"FP64 array readback value changed, read {rb_f[1]} instead of {val_f}" - # ) + print("VAR: Setting and checking a few variables") + # Use random variables for subsequent runs + val_f = 100 * np.random.random() + dev.write_float(11, val_f) + rb_f = dev.read_float(11) + self.assertEqual( + val_f, rb_f, f"Floating point readback value changed, read {rb_f} instead of {val_f}" + ) + rb_f = dev.read_float(10, 4) + self.assertEqual( + val_f, rb_f[1], f"FP64 array readback value changed, read {rb_f[1]} instead of {val_f}" + ) - # val_i = 1 + int(100 * np.random.random()) - # dev.write_int(13, val_i) - # rb_i = dev.read_int(13) - # self.assertEqual( - # val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" - # ) - # rb_i = dev.read_int(10, 4) - # self.assertEqual( - # val_i, - # rb_i[3], - # f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", - # ) + val_i = 1 + int(100 * np.random.random()) + dev.write_int(13, val_i) + rb_i = dev.read_int(13) + self.assertEqual( + val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" + ) + rb_i = dev.read_int(10, 4) + self.assertEqual( + val_i, + rb_i[3], + f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", + ) - # val_i = 1 + int(100 * np.random.random()) - # dev.write_int(13, val_i) - # rb_i = dev.read_int(13) - # self.assertEqual( - # val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" - # ) - # rb_i = dev.read_int(10, 4) - # self.assertEqual( - # val_i, - # rb_i[3], - # f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", - # ) + val_i = 1 + int(100 * np.random.random()) + dev.write_int(13, val_i) + rb_i = dev.read_int(13) + self.assertEqual( + val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" + ) + rb_i = dev.read_int(10, 4) + self.assertEqual( + val_i, + rb_i[3], + f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", + ) - # val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) - # val_s = "".join(val_s) - # dev.write_string(3, val_s) - # rb_s = dev.read_string(3) - # self.assertEqual( - # val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" - # ) - # print("VAR: Done testing VAR module basic functionality!") + val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) + val_s = "".join(val_s) + dev.write_string(3, val_s) + rb_s = dev.read_string(3) + self.assertEqual( + val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" + ) + print("VAR: Done testing VAR module basic functionality!") - # def test_04_global_bindings(self): - # """Test the direct global variable bindings together with the task and - # standard global variable interfaces. - # """ - # # Throws if IOC is not available or PVs are incorrect - # var = aa1GlobalVariables(AA1_IOC_NAME + ":VAR:", name="gvar") - # var.wait_for_connection() - # dev = aa1GlobalVariableBindings(AA1_IOC_NAME + ":VAR:", name="var") - # dev.wait_for_connection() - # tsk = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") - # tsk.wait_for_connection() + def test_04_global_bindings(self): + """Test the direct global variable bindings together with the task and + standard global variable interfaces. + """ + # Throws if IOC is not available or PVs are incorrect + var = aa1GlobalVariables(AA1_IOC_NAME + ":VAR:", name="gvar") + var.wait_for_connection() + dev = aa1GlobalVariableBindings(AA1_IOC_NAME + ":VAR:", name="var") + dev.wait_for_connection() + tsk = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") + tsk.wait_for_connection() - # print("\nVAR: Checking the direct global variable bindings with random numbers") - # # Check reading/writing float variables - # val_f = 100 * np.random.random() - # dev.float16.set(val_f, settle_time=0.5).wait() - # self.assertEqual( - # val_f, - # dev.float16.value, - # f"Unexpected floating readback, read {dev.float16.value}, expected {val_f}", - # ) + print("\nVAR: Checking the direct global variable bindings with random numbers") + # Check reading/writing float variables + val_f = 100 * np.random.random() + dev.float16.set(val_f, settle_time=0.5).wait() + self.assertEqual( + val_f, + dev.float16.value, + f"Unexpected floating readback, read {dev.float16.value}, expected {val_f}", + ) - # val_f = 100 * np.random.random() - # var.write_float(3, val_f, settle_time=1.5) - # self.assertEqual( - # val_f, - # dev.float3.value, - # f"Unexpected floating readback, read {dev.float3.value}, expected {val_f}", - # ) + val_f = 100 * np.random.random() + var.write_float(3, val_f, settle_time=1.5) + self.assertEqual( + val_f, + dev.float3.value, + f"Unexpected floating readback, read {dev.float3.value}, expected {val_f}", + ) - # # Check reading/writing integer variables - # val_i = int(100 * np.random.random()) - 50 - # var.write_int(3, val_i, settle_time=1.5) - # self.assertEqual( - # val_i, - # dev.int3.value, - # f"Unexpected integer readback, read {dev.int3.value}, expected {val_i}", - # ) + # Check reading/writing integer variables + val_i = int(100 * np.random.random()) - 50 + var.write_int(3, val_i, settle_time=1.5) + self.assertEqual( + val_i, + dev.int3.value, + f"Unexpected integer readback, read {dev.int3.value}, expected {val_i}", + ) - # val_i = 1 + int(100 * np.random.random()) - # dev.int8.set(val_i, settle_time=0.5).wait() - # self.assertEqual( - # val_i, - # dev.int8.value, - # f"Unexpected integer readback, read {dev.int8.value}, expected {val_i}", - # ) + val_i = 1 + int(100 * np.random.random()) + dev.int8.set(val_i, settle_time=0.5).wait() + self.assertEqual( + val_i, + dev.int8.value, + f"Unexpected integer readback, read {dev.int8.value}, expected {val_i}", + ) - # # Check reading/writing string variables - # val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) - # val_s = "".join(val_s) - # dev.str4.set(val_s, settle_time=0.5).wait() - # rb_s = dev.str4.value - # self.assertEqual( - # val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" - # ) - # print("VAR: Done testing VAR module direct bindings!") + # Check reading/writing string variables + val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) + val_s = "".join(val_s) + dev.str4.set(val_s, settle_time=0.5).wait() + rb_s = dev.str4.value + self.assertEqual( + val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" + ) + print("VAR: Done testing VAR module direct bindings!") - # # Attempt to run a short test script in globals (integer) - # text = """ - # $iglobal[1]=420 - # $rglobal[1]=1.4142 - # """ - # tsk.configure({'script_text': text, 'script_task': 4}) - # tsk.arm() - # tsk.launch() - # self.assertEqual( - # 420, - # dev.int1.value, - # f"Unexpected integer readback , read {dev.int1.value}, expected 420", - # ) - # self.assertAlmostEqual( - # 1.4142, - # dev.float1.value, - # f"Unexpected floating readback , read {dev.float1.value}, expected 1.4142", - # ) + # Attempt to run a short test script in globals (integer) + val_i = int(1000*np.random.rand()) + val_f = 1000*(np.random.rand()-0.5) + text = f""" + $iglobal[1]={val_i} + $rglobal[1]={val_f} + """ + tsk.configure({'script_text': text, 'script_task': 4}) + tsk.arm() + tsk.launch() + tsk.complete().wait() + self.assertEqual( + val_i, + dev.int1.value, + f"Unexpected integer readback , read {dev.int1.value}, expected {val_i}", + ) + self.assertAlmostEqual( + val_f, + dev.float1.value, + f"Unexpected floating readback , read {dev.float1.value}, expected {val_f}", + ) - # def test_05_axis_motorrecord(self): - # """Tests the basic move functionality of the MotorRecord""" - # mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") - # mot.wait_for_connection() - # mot.motor_enable.set(1, settle_time=0.1).wait() - # mot.velocity.set(200).wait() - # print("\nMR: Checking the standard EPICS motor record") + def test_05_axis_motorrecord(self): + """Tests the basic move functionality of the MotorRecord""" + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + mot.motor_enable.set(1, settle_time=0.1).wait() + mot.velocity.set(200).wait() + print("\nMR: Checking the standard EPICS motor record") - # # Four test with absolute moves - # def RelMove(dist): - # target = mot.position + dist - # print(f"Absolute move to {target}, distance: {dist}") - # mot.move(target, wait=True) - # self.assertTrue( - # np.abs(target - mot.position) < 0.1, - # f"Final position {mot.position} is too far from target {target}", - # ) + # Four test with absolute moves + def move_rel(dist): + target = mot.position + dist + print(f"Absolute move to {target}, distance: {dist}") + mot.move(target, wait=True) + self.assertTrue( + np.abs(target - mot.position) < 0.1, + f"Final position {mot.position} is too far from target {target}", + ) - # RelMove(30 + 50.0 * np.random.random()) - # RelMove(20 + 50.0 * np.random.random()) - # RelMove(-10 + 50.0 * np.random.random()) - # RelMove(-40 - 50.0 * np.random.random()) - # RelMove(10 + 50.0 * np.random.random()) - # print("MR: Done testing MotorRecord!") + move_rel(30 + 50.0 * np.random.random()) + move_rel(20 + 50.0 * np.random.random()) + move_rel(-10 + 50.0 * np.random.random()) + move_rel(-40 - 50.0 * np.random.random()) + move_rel(10 + 50.0 * np.random.random()) + print("MR: Done testing MotorRecord!") - # def test_06_axis_io(self): - # """Test axis IO bindings""" - # # Throws if IOC is not available or PVs are incorrect - # dev = aa1AxisIo(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":IO:", name="io") - # dev.wait_for_connection() - # print("\nAX:IO: Checking axis IO") + def test_06_axis_io(self): + """Test axis IO bindings""" + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisIo(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":IO:", name="io") + dev.wait_for_connection() - # # Writing random values to analog/digital outputs - # for _ in range(5): - # val_f = np.random.random() - # dev.set_analog(0, val_f) - # self.assertAlmostEqual(dev.ao.get(), val_f, 2, "Unexpected readback on analog output") - # self.assertAlmostEqual(dev.ao0.get(), val_f, 2, "Unexpected readback on analog output") + # Writing random values to analog/digital outputs + print("\nAX:IO: Checking axis analog IO") + for _ in range(5): + val_f = np.random.random() + dev.set_analog(0, val_f) + self.assertAlmostEqual(dev.ao.get(), val_f, 2, "Unexpected setpoint on analog output") + self.assertAlmostEqual(dev.ao0.get(), val_f, 2, "Unexpected readback on analog output") - # print("Digital") - # for _ in range(5): - # val_b = round(np.random.random()) - # dev.set_digital(0, val_b) - # self.assertEqual(dev.do.get(), val_b, "Unexpected readback on digital output") - # self.assertEqual(dev.do0.get(), val_b, "Unexpected readback on digital output") - # print("IO: Done testing IO module!") + print("\nAX:IO: Checking axis digital IO") + for _ in range(5): + val_b = round(np.random.random()) + dev.set_digital(0, val_b) + self.assertEqual(dev.do.get(), val_b, "Unexpected setpoint on digital output") + self.assertEqual(dev.do0.get(), val_b, "Unexpected readback on digital output") + print("IO: Done testing IO module!") - # def test_07_axis_ddc(self): - # """Drive data collection + def test_07_axis_ddc(self): + """Drive data collection - # This is the preliminary test case dor drive data collection using - # internal PSO based triggers. It tests basic functionality, like - # configuration, state monitoring and readback. - # """ - # # Throws if IOC is not available or PVs are incorrect - # dev = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - # event = EpicsSignal( - # AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:EVENT:SINGLE", put_complete=True - # ) - # dev.wait_for_connection() - # event.wait_for_connection() - # print("\nAX:DDC: Checking axis DDC") + This is the preliminary test case dor drive data collection using + internal PSO based triggers. It tests basic functionality, like + configuration, state monitoring and readback. + """ + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + event = EpicsSignal( + AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:EVENT:SINGLE", put_complete=True + ) + dev.wait_for_connection() + event.wait_for_connection() + print("\nAX:DDC: Checking axis DDC") - # # Stop any running acquisition - # dev.unstage() + # Stop any running acquisition + dev.unstage() - # print("AX:DDC: Running a partial acquisition (1/3)") - # # Configure the DDC for 42 points and start waiting for triggers - # cfg = {"num_points_total": 42, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} - # dev.configure(cfg) - # dev.stage() - # # Fire 12 triggers and stop the acquisition early - # for _ in range(12): - # event.set(1, settle_time=0.1).wait() - # # Stop the data acquisition early - # dev.unstage() + print("AX:DDC: Running a partial acquisition (1/3)") + # Configure the DDC for 42 points and start waiting for triggers + cfg = {"num_points_total": 42, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} + dev.configure(cfg) + dev.arm() + # Fire 12 triggers and stop the acquisition early + for _ in range(12): + event.set(1, settle_time=0.1).wait() + # Stop the data acquisition early + dev.disarm() - # # Wait for polling to catch up - # time.sleep(20) + # Wait for polling to catch up + time.sleep(1) - # # Do the readback and check final state - # rbv = dev.collect() - # self.assertEqual( - # dev.nsamples_rbv.value, - # 12, - # f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", - # ) - # self.assertEqual( - # len(rbv["data"]["buffer0"]), - # 12, - # f"Expected to read back 12 points by now, got {len(rbv['data']['buffer0'])}", - # ) - # self.assertEqual( - # len(rbv["data"]["buffer1"]), - # 12, - # f"Expected to read back 12 points by now, got {len(rbv['data']['buffer1'])}", - # ) + # Do the readback and check final state + rbv = dev.collect() + self.assertEqual( + dev.nsamples_rbv.value, + 12, + f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 12, + f"Expected to read back 12 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 12, + f"Expected to read back 12 points by now, got {len(rbv['data']['buffer1'])}", + ) - # print("AX:DDC: Running an overruning acquisition (2/3)") - # # Configure the DDC for 16 points and start waiting for triggers - # cfg = {"num_points_total": 16, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} - # dev.configure(cfg) - # dev.stage() - # num = dev.nsamples_rbv.get() - # self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") + print("AX:DDC: Running an overruning acquisition (2/3)") + # Configure the DDC for 16 points and start waiting for triggers + cfg = {"num_points_total": 16, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} + dev.configure(cfg) + dev.arm() + num = dev.nsamples_rbv.get() + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") - # # Fire 20 triggers and stop the acquisition early - # for _ in range(20): - # event.set(1, settle_time=0.05).wait() - # dev.unstage() + # Fire 20 triggers and stop the acquisition early + for _ in range(20): + event.set(1, settle_time=0.05).wait() + dev.disarm() - # # Wait for polling to catch up - # time.sleep(1) + # Wait for polling to catch up + time.sleep(1) - # # Do the readback and check final state - # rbv = dev.collect() - # self.assertEqual( - # dev.nsamples_rbv.value, - # 16, - # f"Expected to collect 16 points by now, got {dev.nsamples_rbv.value}", - # ) - # self.assertEqual( - # len(rbv["data"]["buffer0"]), - # 16, - # f"Expected to read back 16 points by now, got {len(rbv['data']['buffer0'])}", - # ) - # self.assertEqual( - # len(rbv["data"]["buffer1"]), - # 16, - # f"Expected to read back 16 points by now, got {len(rbv['data']['buffer1'])}", - # ) + # Do the readback and check final state + rbv = dev.collect() + self.assertEqual( + dev.nsamples_rbv.value, + 16, + f"Expected to collect 16 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 16, + f"Expected to read back 16 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 16, + f"Expected to read back 16 points by now, got {len(rbv['data']['buffer1'])}", + ) - # print("AX:DDC: Trying to record some noise on the analog input (3/3)") - # cfg = { - # "num_points_total": 36, - # "ddc_trigger": DriveDataCaptureTrigger.PsoEvent, - # "ddc_source1": DriveDataCaptureInput.AnalogInput0, - # } - # dev.configure(cfg) - # dev.stage() - # num = dev.nsamples_rbv.get() - # self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") - # # Fire 36 triggers - # for ii in range(36): - # event.set(1, settle_time=0.05).wait() - # dev.unstage() + print("AX:DDC: Trying to record some noise on the analog input (3/3)") + cfg = { + "num_points_total": 36, + "ddc_trigger": DriveDataCaptureTrigger.PsoEvent, + "ddc_source1": DriveDataCaptureInput.AnalogInput0, + } + dev.configure(cfg) + dev.arm() + num = dev.nsamples_rbv.get() + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") + # Fire 36 triggers + for _ in range(36): + event.set(1, settle_time=0.05).wait() + dev.disarm() - # # Wait for polling to catch up - # time.sleep(1) + # Wait for polling to catch up + time.sleep(1) - # # Do the readback and check final state - # rbv = dev.collect() - # self.assertEqual( - # dev.nsamples_rbv.value, - # 36, - # f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", - # ) - # self.assertEqual( - # len(rbv["data"]["buffer0"]), - # 36, - # f"Expected to read back 36 points by now, got {len(rbv['data']['buffer0'])}", - # ) - # self.assertEqual( - # len(rbv["data"]["buffer1"]), - # 36, - # f"Expected to read back 36 points by now, got {len(rbv['data']['buffer1'])}", - # ) - # self.assertTrue( - # np.std(rbv["data"]["buffer1"]) > 0.000001, - # f"Expected some noise on analog input, but std is: {np.std(rbv['data']['buffer1'])}", - # ) - # print("AX:DDCal(l: Done testing DDC module!") + # Do the readback and check final state + rbv = dev.collect() + self.assertEqual( + dev.nsamples_rbv.value, + 36, + f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 36, + f"Expected to read back 36 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 36, + f"Expected to read back 36 points by now, got {len(rbv['data']['buffer1'])}", + ) + self.assertTrue( + np.std(rbv["data"]["buffer1"]) > 0.000001, + f"Expected some noise on analog input, but std is: {np.std(rbv['data']['buffer1'])}", + ) + print("AX:DDCal(l: Done testing DDC module!") - # def test_08_axis_pso_distance1(self): - # """Test basic PSO distance mode functionality + def test_08_axis_pso_distance1(self): + """Test basic PSO distance mode functionality - # This module tests basic PSO distance functionality, like events and - # waveform generation. The goal is to ensure the basic operation of - # the PSO module and the polled feedback. - # """ - # # Throws if IOC is not available or PVs are incorrect - # pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - # ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - # mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") - # mot.wait_for_connection() - # pso.wait_for_connection() - # ddc.wait_for_connection() - # pso.unstage() - # ddc.unstage() - # mot.unstage() - # print("\nAX:PSO: Checking axis PSO in distance mode (basic)") + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name=AA1_AXIS_NAME.lower()) + mot.wait_for_connection() + pso.wait_for_connection() + ddc.wait_for_connection() + pso.disarm() + ddc.disarm() + mot.unstage() + print("\nAX:PSO: Checking axis PSO in distance mode (basic)") - # print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)") - # # Configure steps and move to middle of range - # pso.configure({"pso_distance": 5, "pso_wavemode": "toggle"}) - # pso.kickoff() - # mot.velocity.set(200).wait() - # mot.move(mot.position + 2.5) - # # Start moving in steps, while checking output - # for ii in range(10): - # last_pso = pso.output.value - # mot.move(mot.position + 5) - # time.sleep(1) - # self.assertTrue( - # pso.output.value != last_pso, - # f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}", - # ) - # pso.unstage() - # ddc.unstage() + print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)") + # Configure steps and move to middle of range + pso.configure({"pso_distance": 5, "pso_wavemode": "toggle"}) + pso.arm() + mot.velocity.set(200).wait() + mot.move(mot.position + 2.5, wait=True) - # print("AX:PSO: Checking basic 'toggled' event generation with DDC (2/3)") - # # Configure steps and move to middle of range - # pso.configure({"pso_distance": 2, "pso_wavemode": "toggle"}) - # pso.kickoff() + ddc.arm() + pso.arm() + # Start moving in steps, while checking output + for ii in range(10): + last_pso = pso.output.value + mot.move(mot.position + 5, wait=True) + time.sleep(1) + self.assertTrue( + pso.output.value != last_pso, + f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}", + ) + pso.disarm() + ddc.disarm() - # # Configure DDC - # npoints = 720 / 2.0 - # cfg = {"num_points_total": npoints + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} - # ddc.configure(cfg) - # ddc.stage() - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, - # 0, - # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", - # ) - # self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") + print("AX:PSO: Checking basic 'toggled' event generation with DDC (2/3)") + # Configure steps and move to middle of range + pso.configure({"pso_distance": 2, "pso_wavemode": "toggle"}) + pso.arm() - # # Start moving and wait for polling - # mot.move(mot.position + 720) - # time.sleep(0.5) + # Configure DDC + npoints = 720 / 2.0 + cfg = {"num_points_total": npoints + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + ddc.arm() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") - # # Evaluate results - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertTrue( - # np.abs(npoints_rbv - 720 / 4) <= 1, - # f"Expected to record {720/4} points but got {npoints_rbv}", - # ) - # pso.unstage() - # ddc.unstage() + # Start moving and wait for polling + mot.move(mot.position + 720, wait=True) + time.sleep(1) - # print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/5)") - # # Configure steps and move to middle of range - # pso.configure({"pso_distance": 1.8, "pso_wavemode": "pulsed", "pso_n_pulse": 3}) - # pso.kickoff() + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue( + np.abs(npoints_rbv - 720 / 4) <= 1, + f"Expected to record {720/4} points but got {npoints_rbv}", + ) + pso.disarm() + ddc.disarm() - # # Configure DDC - # npoints = 3 * 720 / 1.8 - # cfg = {"num_points_total": npoints + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} - # ddc.configure(cfg) - # ddc.stage() - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, - # 0, - # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", - # ) - # self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") + print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/5)") + # Configure steps and move to middle of range + pso.configure({"pso_distance": 1.8, "pso_wavemode": "pulsed", "pso_n_pulse": 3}) + pso.arm() - # # Start moving and wait for polling - # mot.move(mot.position + 720) - # time.sleep(0.5) + # Configure DDC + npoints = 3 * 720 / 1.8 + cfg = {"num_points_total": (npoints) + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + ddc.arm() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") - # # Evaluate results - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertTrue( - # np.abs(npoints_rbv - 3 * 720 / 1.8) <= 1, - # f"Expected to record {3*720/1.8} points but got {npoints_rbv}", - # ) - # print("AX:PSO: Done testing basic PSO distance functionality!") - # pso.unstage() - # ddc.unstage() + # Start moving and wait for polling + mot.move(mot.position + 720 + 0.9, wait=True) + time.sleep(1) - # def xtest_09_axis_pso_integration01(self): - # """The first integration test aims to verify PSO functionality under - # real life scenarios. It uses the DDC for collecting encoder signals - # and compares them to the expected values. - # """ - # # Throws if IOC is not available or PVs are incorrect - # pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - # ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - # mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") - # mot.wait_for_connection() - # pso.wait_for_connection() - # ddc.wait_for_connection() - # pso.unstage() - # ddc.unstage() - # mot.unstage() - # print( - # "\ntest_09_PsoIntegration01: Integration test for vector PSO triggering (advanced)" - # ) + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue( + np.abs(npoints_rbv - 3 * 720 / 1.8) <= 1, + f"Expected to record {3*720/1.8} points but got {npoints_rbv}", + ) + print("AX:PSO: Done testing basic PSO distance functionality!") + pso.disarm() + ddc.disarm() - # print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/3)") - # # This is one line of the Tomcat sequence scan! - # acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value - # vec_dist = [acc_dist, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18] - # pso.configure({"pso_distance": vec_dist, "pso_wavemode": "toggle"}) - # pso.kickoff() + def test_09_axis_pso_integration_1(self): + """The first integration test aims to verify PSO functionality under + real life scenarios. It uses the DDC for collecting encoder signals + and compares them to the expected values. + """ + # Throws if IOC is not available or PVs are incorrect + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + pso.wait_for_connection() + ddc.wait_for_connection() + pso.unstage() + ddc.unstage() + mot.unstage() + print( + "\ntest_09_PsoIntegration01: Integration test for vector PSO triggering (advanced)" + ) - # # Configure DDC - # npoints_exp = len(vec_dist) / 2 - # cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} - # ddc.configure(cfg) - # ddc.stage() - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, - # 0, - # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", - # ) - # self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") + print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/3)") + # This is one line of the Tomcat sequence scan! + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + vec_dist = [acc_dist, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18] + pso.configure({"pso_distance": vec_dist, "pso_wavemode": "toggle"}) + pso.arm() - # # Start moving and wait for polling - # mot.move(mot.position + np.sum(vec_dist) + 10) - # time.sleep(1) + # Configure DDC + npoints_exp = len(vec_dist) / 2 + cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + ddc.arm() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") - # # Evaluate results - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertTrue( - # np.abs(npoints_rbv - npoints_exp) < 1, - # f"Expected to record {npoints_exp} points but got {npoints_rbv}", - # ) - # self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") - # self.assertTrue( - # pso.dstArrayDepleted.value, "Expected to cover all points in the distance array" - # ) - # pso.unstage() - # ddc.unstage() + # Start moving and wait for polling + mot.move(mot.position + np.sum(vec_dist) + 10) + time.sleep(1) - # print("AX:PSO: Checking 'individual' style trigger signal with DDC (2/5)") - # # This is practically golden ratio tomography! - # acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value - # vec_dist = [acc_dist] - # vec_dist.extend([1.618] * 100) - # pso.configure({"pso_distance": vec_dist, "pso_wavemode": "pulsed"}) - # pso.kickoff() + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue( + np.abs(npoints_rbv - npoints_exp) < 1, + f"Expected to record {npoints_exp} points but got {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") + self.assertTrue( + pso.dstArrayDepleted.value, "Expected to cover all points in the distance array" + ) + pso.unstage() + ddc.unstage() - # # Configure DDC - # npoints_exp = len(vec_dist) - # cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} - # ddc.configure(cfg) - # ddc.stage() - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, - # 0, - # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", - # ) - # self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") + print("AX:PSO: Checking 'individual' style trigger signal with DDC (2/5)") + # This is practically golden ratio tomography! + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + vec_dist = [acc_dist] + vec_dist.extend([1.618] * 100) + pso.configure({"pso_distance": vec_dist, "pso_wavemode": "pulsed"}) + pso.arm() - # # Start moving and wait for polling (move negative direction) - # mot.move(mot.position - np.sum(vec_dist) - acc_dist) - # time.sleep(1) + # Configure DDC + npoints_exp = len(vec_dist) + cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + ddc.arm() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") - # # Evaluate results - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertTrue( - # np.abs(npoints_rbv - npoints_exp) < 1, - # f"Expected to record {npoints_exp} points but got {npoints_rbv}", - # ) - # self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") - # self.assertTrue( - # pso.dstArrayDepleted.value, "Expected to cover all pints in the distance array" - # ) - # print("AX:PSO: Done testing PSO module in distance mode!") - # pso.unstage() - # ddc.unstage() + # Start moving and wait for polling (move negative direction) + mot.move(mot.position - np.sum(vec_dist) - acc_dist) + time.sleep(1) + + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue( + np.abs(npoints_rbv - npoints_exp) < 1, + f"Expected to record {npoints_exp} points but got {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") + self.assertTrue( + pso.dstArrayDepleted.value, "Expected to cover all pints in the distance array" + ) + print("AX:PSO: Done testing PSO module in distance mode!") + pso.unstage() + ddc.unstage() # def test_10_AxisPsoWindow1(self): # """ Test basic PSO window mode functionality @@ -667,7 +673,6 @@ class AerotechAutomation1Test(unittest.TestCase): # print("AX:PSO: Directly checking basic window output (1/3)") # # Configure steps and move to middle of range # pso.configure({'bounds': (5, 13), 'pso_wavemode': "output"}) - # pso.kickoff() # time.sleep(1) # # Move half a step # mot.move(mot.position + 0.1) @@ -701,13 +706,13 @@ class AerotechAutomation1Test(unittest.TestCase): # # Configure the data capture (must be performed in start position) # cfg = {'ntotal': 5*10, 'trigger': DriveDataCaptureTrigger.PsoOutput} # ddc.configure(cfg) - # ddc.stage() + # ddc.arm() # npoints_rbv = ddc.nsamples_rbv.value # self.assertEqual(npoints_rbv, 0, # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") # # Repeated line scan # for ii in range(10): - # dev.stage() + # dev.arm() # mot.move(start_position + 30 + acc_dist) # dev.unstage() # mot.move(start_position - acc_dist) @@ -725,11 +730,11 @@ class AerotechAutomation1Test(unittest.TestCase): # # Configure PSO in window mode # cfg = {'bounds': (acc_dist, 30 + acc_dist + 1), 'pso_wavemode': "output"} # dev.configure(cfg) - # dev.stage() + # dev.arm() # # Configure the data capture # cfg = {'ntotal': 10, 'trigger': DriveDataCaptureTrigger.PsoOutput} # ddc.configure(cfg) - # ddc.stage() + # ddc.arm() # npoints_rbv = ddc.nsamples_rbv.value # self.assertEqual(npoints_rbv, 0, # f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") @@ -744,247 +749,246 @@ class AerotechAutomation1Test(unittest.TestCase): # self.assertTrue(np.abs(npoints_rbv - 10) < 1, # f"Expected to record 10 points but got {npoints_rbv}") - # def xtest_12_StepScan(self): - # """Test a simple blusky style step scan (via BEC) + def test_12_StepScan(self): + """Test a simple blusky style step scan (via BEC) - # This module tests manual PSO trigger functionality. The goal is to - # ensure the basic operation of the PSO module and the polled feedback. - # """ - # # Throws if IOC is not available or PVs are incorrect - # pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - # ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - # mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") - # mot.wait_for_connection() - # pso.wait_for_connection() - # ddc.wait_for_connection() - # pso.unstage() - # ddc.unstage() - # mot.unstage() - # t_start = time.time() - # print("\ntest_12_StepScan: Simple step scan (via Ophyd)") + This module tests manual PSO trigger functionality. The goal is to + ensure the basic operation of the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + pso.wait_for_connection() + ddc.wait_for_connection() + pso.disarm() + ddc.disarm() + mot.unstage() + t_start = time.time() + print("\ntest_12_StepScan: Simple step scan (via Ophyd)") - # pos_start = 112.0 - # scan_range = 180 - # scan_steps = 18 + pos_start = 112.0 + scan_range = 180 + scan_steps = 18 - # step_size = scan_range / scan_steps - # positions = [] - # for ii in range(scan_steps + 1): - # positions.append(pos_start + ii * step_size) + step_size = scan_range / scan_steps + positions = [] + for ii in range(scan_steps + 1): + positions.append(pos_start + ii * step_size) - # # Motor setup - # mot.move(pos_start) - # self.assertTrue( - # np.abs(mot.position - pos_start) < 0.1, - # f"Expected to move to scan start position {pos_start}, now at {mot.position}", - # ) - # # PSO setup - # pso.configure(d={"n_pulse": 2}) - # # DDC setup (trigger at each point plus some extra) - # cfg = {"num_points_total": 10 * (scan_steps + 1), "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} - # ddc.configure(cfg) + # Motor setup + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 1.0, + f"Expected to move to scan start position {pos_start}, now at {mot.position}", + ) + # PSO setup (very large pulse distance) + pso.configure(d={"pso_n_pulse": 2, "pso_distance": 99999}) + # DDC setup (trigger at each point plus some extra) + cfg = {"num_points_total": 1000 * (scan_steps + 1), "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) - # print("\tStage") - # mot.stage() - # pso.stage() - # ddc.stage() + print("\tStage") + mot.stage() + pso.arm() + ddc.arm() - # for ii in range(len(positions)): - # mot.move(positions[ii]) - # self.assertTrue( - # np.abs(mot.position - positions[ii]) < 1.0, - # f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}", - # ) - # pso.trigger() + for ii in range(len(positions)): + mot.move(positions[ii]) + self.assertTrue( + np.abs(mot.position - positions[ii]) < 1.0, + f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}", + ) + pso.trigger() + time.sleep(1) - # print("\tUnstage") - # mot.unstage() - # ddc.unstage() - # pso.unstage() + print("\tUnstage") + mot.unstage() + ddc.disarm() + pso.disarm() - # t_end = time.time() - # t_elapsed = t_end - t_start - # print(f"Elapsed scan time: {t_elapsed}") + t_end = time.time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") - # print("Evaluate final state") - # time.sleep(0.5) - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, - # 2 * len(positions), - # f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}", - # ) + print("Evaluate final state") + time.sleep(0.5) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, + 2 * len(positions), + f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}", + ) - # def xtest_13_TomcatSequenceScan(self): - # """Test the zig-zag sequence scan from Tomcat (via BEC) + def test_13_TomcatSequenceScan(self): + """Test the zig-zag sequence scan from Tomcat (via BEC) - # This module tests basic PSO distance functionality, like events and - # waveform generation. The goal is to ensure the basic operation of - # the PSO module and the polled feedback. - # """ - # # Throws if IOC is not available or PVs are incorrect - # pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - # ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - # mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") - # mot.wait_for_connection() - # pso.wait_for_connection() - # ddc.wait_for_connection() - # pso.unstage() - # ddc.unstage() - # mot.unstage() - # t_start = time.time() - # print("\ntest_13_TomcatSequenceScan: TOMCAT Sequeence scan (via Ophyd)") + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + pso.wait_for_connection() + ddc.wait_for_connection() + pso.disarm() + ddc.disarm() + mot.unstage() + t_start = time.time() + print("\ntest_13_TomcatSequenceScan: TOMCAT Sequeence scan (via Ophyd)") - # scan_start = 42.0 - # scan_range = 180 - # num_repeat = 10 - # scan_type = "PosNeg" + scan_start = 42.0 + scan_range = 180 + num_repeat = 10 + scan_type = "PosNeg" - # # Dynamic properties - # scan_vel = 90 - # scan_acc = 500 - # dist_safe = 10.0 + # Dynamic properties + scan_vel = 90 + scan_acc = 500 + dist_safe = 10.0 - # ###################################################################### - # print("\tConfigure") - # # Scan range - # dist_acc = 0.5 * scan_vel * scan_vel / scan_acc + dist_safe - # if scan_type in ["PosNeg", "Pos"]: - # pos_start = scan_start - dist_acc - # pos_end = scan_start + scan_range + dist_acc - # elif scan_type in ["NegPos", "Neg"]: - # pos_start = scan_start + dist_acc - # pos_end = scan_start - scan_range - dist_acc - # else: - # raise RuntimeError(f"Unexpected ScanType: {scan_type}") + ###################################################################### + print("\tConfigure") + # Scan range + dist_acc = 0.5 * scan_vel * scan_vel / scan_acc + dist_safe + if scan_type in ["PosNeg", "Pos"]: + pos_start = scan_start - dist_acc + pos_end = scan_start + scan_range + dist_acc + elif scan_type in ["NegPos", "Neg"]: + pos_start = scan_start + dist_acc + pos_end = scan_start - scan_range - dist_acc + else: + raise RuntimeError(f"Unexpected ScanType: {scan_type}") - # # Motor setup - # mot.velocity.set(scan_vel).wait() - # mot.acceleration.set(scan_vel / scan_acc).wait() - # mot.move(pos_start) - # self.assertTrue( - # np.abs(mot.position - pos_start) < 0.1, - # f"Expected to move to scan start position {pos_start}, now at {mot.position}", - # ) + # Motor setup + mot.velocity.set(scan_vel).wait() + mot.acceleration.set(scan_vel / scan_acc).wait() + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 1.0, + f"Expected to move to scan start position {pos_start}, now at {mot.position}", + ) - # # PSO setup - # print(f"Configuring PSO to: {[dist_acc, scan_range]}") - # cfg = {"pso_distance": [dist_acc, scan_range], "pso_wavemode": "toggle"} - # pso.configure(cfg) + # PSO setup + print(f"Configuring PSO to: {[dist_acc, scan_range]}") + cfg = {"pso_distance": [dist_acc, scan_range], "pso_wavemode": "toggle"} + pso.configure(cfg) - # # DDC setup (Tomcat runs in ENABLE mode, so only one posedge) - # cfg = {"num_points_total": num_repeat, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} - # ddc.configure(cfg) + # DDC setup (Tomcat runs in ENABLE mode, so only one posedge) + cfg = {"num_points_total": num_repeat, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) - # print("\tStage") - # mot.stage() - # pso.stage() - # ddc.stage() - # print("\tKickoff") - # pso.kickoff() + print("\tStage") + mot.stage() + pso.arm() + ddc.arm() - # for ii in range(num_repeat): - # # No option to reset the index counter... - # pso.dstArrayRearm.set(1).wait() + for ii in range(num_repeat): + # No option to reset the index counter... + pso.dstArrayRearm.set(1).wait() - # if scan_type in ["Pos", "Neg"]: - # mot.move(pos_end) - # self.assertTrue( - # np.abs(mot.position - pos_end) < 1.0, - # f"Expected to reach {pos_end}, now is at {mot.position} on iteration {ii}", - # ) - # mot.move(pos_start) - # self.assertTrue( - # np.abs(mot.position - pos_start) < 1.0, - # f"Expected to rewind to {pos_start}, now at {mot.position} on iteration {ii}", - # ) + if scan_type in ["Pos", "Neg"]: + mot.move(pos_end) + self.assertTrue( + np.abs(mot.position - pos_end) < 1.0, + f"Expected to reach {pos_end}, now is at {mot.position} on iteration {ii}", + ) + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 1.0, + f"Expected to rewind to {pos_start}, now at {mot.position} on iteration {ii}", + ) - # if scan_type in ["PosNeg", "NegPos"]: - # if ii % 2 == 0: - # mot.move(pos_end) - # self.assertTrue( - # np.abs(mot.position - pos_end) < 1.0, - # f"Expected to reach {pos_end}, now at {mot.position} on iteration {ii}", - # ) - # else: - # mot.move(pos_start) - # self.assertAlmostEqual( - # mot.position, - # pos_start, - # 1, - # f"Expected to reach {pos_start}, now is at {mot.position} on iteration {ii}", - # ) + if scan_type in ["PosNeg", "NegPos"]: + if ii % 2 == 0: + mot.move(pos_end, wait=True) + self.assertTrue( + np.abs(mot.position - pos_end) < 1.0, + f"Expected to reach {pos_end}, now at {mot.position} on iteration {ii}", + ) + else: + mot.move(pos_start, wait=True) + self.assertTrue( + np.abs(mot.position - pos_start) < 1.0, + f"Expected to reach {pos_start}, now at {mot.position} on iteration {ii}", + ) - # time.sleep(0.2) - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, - # ii + 1, - # f"Expected to record {ii+1} DDC points, got: {npoints_rbv} on iteration {ii}", - # ) + time.sleep(0.2) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, + ii + 1, + f"Expected to record {ii+1} DDC points, got: {npoints_rbv} on iteration {ii}", + ) - # print("\tUnstage") - # mot.unstage() - # ddc.unstage() - # pso.unstage() + print("\tUnstage") + mot.unstage() + ddc.disarm() + pso.disarm() - # t_end = time.time() - # t_elapsed = t_end - t_start - # print(f"Elapsed scan time: {t_elapsed}") + t_end = time.time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") - # print("Evaluate final state") - # time.sleep(3) - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, - # num_repeat, - # f"Expected to record {num_repeat} DDC points, but got: {npoints_rbv}", - # ) + print("Evaluate final state") + time.sleep(3) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, + num_repeat, + f"Expected to record {num_repeat} DDC points, but got: {npoints_rbv}", + ) - # def xtest_14_JinjaTomcatSequenceScan(self): - # """ Run a test sequence scan with template substitution""" - # # Load the test file - # path = os.path.dirname(os.path.realpath(__file__)) - # filename = f"{path}/testSequenceScanTemplate.ascript" - # with open(filename, encoding="utf-8") as f: - # templatetext = f.read() + def test_14_JinjaTomcatSequenceScan(self): + """ Run a test sequence scan with template substitution""" + # Load the test file + path = os.path.dirname(os.path.realpath(__file__)) + filename = f"{path}/testSequenceScanTemplate.ascript" + with open(filename, encoding="utf-8") as f: + templatetext = f.read() - # scanconfig = { - # "startpos": 42, - # "scanrange": 180, - # "nrepeat": 13, - # "scandir": "NegPos", - # "scanvel": 150, - # "scanacc": 500, - # } - # tm = jinja2.Template(templatetext) - # text = tm.render(scan=scanconfig) - # # with open("test/tcatSequenceScan.ascript", "w") as text_file: - # # text_file.write(text) + scanconfig = { + "startpos": 42, + "scanrange": 180, + "nrepeat": 13, + "scandir": "NegPos", + "scanvel": 150, + "scanacc": 500, + } + tm = jinja2.Template(templatetext) + text = tm.render(scan=scanconfig) + # with open("test/tcatSequenceScan.ascript", "w") as text_file: + # text_file.write(text) - # # Throws if IOC is not available or PVs are incorrect - # dev = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") - # dev.wait_for_connection() - # print("\nTOMCAT Sequeence scan (via Jinjad AeroScript)") + # Throws if IOC is not available or PVs are incorrect + tsk = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + tsk.wait_for_connection() + ddc.wait_for_connection() + print("\nTOMCAT Sequeence scan (via Jinjad AeroScript)") - # # Copy file to controller and run it - # t_start = time.time() - # dev.configure({'script_text': text, 'script_task': 4}) - # dev.stage() - # dev.launch() - # time.sleep(0.5) - # dev.complete().wait() - # t_end = time.time() - # t_elapsed = t_end - t_start - # print(f"Elapsed scan time: {t_elapsed}") + # Copy file to controller and run it + t_start = time.time() + tsk.configure({'script_text': text, 'script_task': 4, 'script_file': "sequence.ascript"}) + ddc.configure({"num_points_total": 13*100}) + tsk.arm() + ddc.arm() + tsk.launch() + time.sleep(1.0) + tsk.complete().wait() + t_end = time.time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") - # time.sleep(0.5) - # ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - # ddc.wait_for_connection() - # npoints_rbv = ddc.nsamples_rbv.value - # self.assertEqual( - # npoints_rbv, 13, f"Expected to record 13 DDC points, got: {npoints_rbv} after script" - # ) + time.sleep(1.0) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual( + npoints_rbv, 13, f"Expected to record 13 DDC points, got: {npoints_rbv} after script" + ) if __name__ == "__main__":