diff --git a/tomcat_bec/devices/aerotech/AerotechAutomation1.py b/tomcat_bec/devices/aerotech/AerotechAutomation1.py index fb236a9..286f1d6 100644 --- a/tomcat_bec/devices/aerotech/AerotechAutomation1.py +++ b/tomcat_bec/devices/aerotech/AerotechAutomation1.py @@ -108,7 +108,7 @@ class aa1Tasks(Device): EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True ) - def launchScript(self, filename: str, task_index: int == 3, filetext=None, settle_time=0.5) -> None: + def launch_script(self, filename: str, task_index: int=4, filetext=None, settle_time=0.5) -> None: """ Launch a script file that either exists, or is newly created and compiled""" self.configure({"text": filetext, "filename": filename, "task_index": task_index}) @@ -232,7 +232,6 @@ class aa1Tasks(Device): return status - class aa1TaskState(Device): """Task state monitoring API @@ -267,7 +266,6 @@ class aa1TaskState(Device): return status - class aa1GlobalVariables(Device): """Global variables @@ -637,6 +635,11 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): raise RuntimeError(f"Unsupported distace triggering mode: {wmode}") old = self.read_configuration() + # Disable everything + self.winEvents.set("Off").wait() + self.dstCounterEna.set("Off").wait() + self.dstEventsEna.set("Off").wait() + # Configure distance generator (also resets counter to 0) self._distance_value = distance if isinstance(distance, (float, int)): @@ -644,10 +647,6 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): elif isinstance(distance, (np.ndarray, list, tuple)): self.dstDistanceArr.set(distance).wait() - self.winEvents.set("Off").wait() - self.dstCounterEna.set("Off").wait() - self.dstEventsEna.set("Off").wait() - # Configure the pulsed/toggled waveform if wmode in ["toggle", "toggled"]: # Switching to simple toggle mode @@ -687,6 +686,9 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): # Bluesky step scan interface def stage(self): """ Stage the device""" + # Rearm the configured array + if isinstance(self._distance_value, (np.ndarray, list, tuple)): + self.dstArrayRearm.set(1).wait() # Turn off counter monitoring self.dstEventsEna.set("On").wait() self.dstCounterEna.set("On").wait() @@ -738,7 +740,6 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): return status - class aa1AxisPsoWindow(aa1AxisPsoBase): """Position Sensitive Output - Window mode @@ -767,12 +768,14 @@ class aa1AxisPsoWindow(aa1AxisPsoBase): """Simplified configuration interface to access the most common functionality for distance mode PSO. - :param distance: The trigger distance or the array of distances between subsequent points. + :param bounds: The trigger window or the array of windows. :param wmode: Waveform mode configuration, usually output/pulsed/toggled. + :param emode: Event mode configuration, usually Off/Enter/Exit/Both. + """ - bounds = d["distance"] + bounds = d["bounds"] wmode = str(d["wmode"]) - emode = str(d["emode"]) + emode = d.get("emode", "Enter") t_pulse = d.get("t_pulse", 100) w_pulse = d.get("w_pulse", 200) n_pulse = d.get("n_pulse", 1) @@ -780,7 +783,7 @@ class aa1AxisPsoWindow(aa1AxisPsoBase): # Validate input parameters if wmode not in ["pulse", "pulsed", "toggle", "toggled", "output", "flag"]: raise RuntimeError(f"Unsupported window triggering mode: {wmode}") - if len(bounds % 2) == 1: + if len(bounds) % 2 == 1: raise RuntimeError(f"Window mode requires an even number of bounds, got: {len(bounds)}") self._mode = wmode diff --git a/tomcat_bec/devices/aerotech/test/autoHilTest.py b/tomcat_bec/devices/aerotech/test/autoHilTest.py index cf68bd8..d7bea35 100644 --- a/tomcat_bec/devices/aerotech/test/autoHilTest.py +++ b/tomcat_bec/devices/aerotech/test/autoHilTest.py @@ -57,6 +57,7 @@ class AerotechAutomation1Test(unittest.TestCase): self.assertTrue(dev.taskcount.get()>0, "Maximum controller task count seems invalid") print("CTRL: Done testing CTRL module!") + def test_02_TaskProxy(self): """ Testing the TASK interface """ @@ -111,12 +112,12 @@ class AerotechAutomation1Test(unittest.TestCase): # Send another empty program filename = "unittesting2.ascript" text = "program\nvar $positions[2] as real\nend" - dev.launchScript(filename, task_index=3, filetext=text) + dev.launch_script(filename, task_index=3, filetext=text) dev.complete().wait() print("TASK: Done testing TASK module!") - def test_04_GlobalVariables1(self): + def test_03_GlobalVariables1(self): """Test the basic read/write global variable interface """ # Throws if IOC is not available or PVs are incorrect @@ -159,7 +160,7 @@ class AerotechAutomation1Test(unittest.TestCase): print("VAR: Done testing VAR module basic functionality!") - def test_05_GlobalVariables2(self): + def test_04_GlobalVariables2(self): """Test the direct global variable bindings together with the task and standard global variable interfaces. """ @@ -208,8 +209,7 @@ class AerotechAutomation1Test(unittest.TestCase): self.assertAlmostEqual(1.4142, dev.float1.value, f"Unexpected floating readback , read {dev.float1.value}, expected 1.4142") - - def test_06_MotorRecord(self): + def test_05_MotorRecord(self): """ Tests the basic move functionality of the MotorRecord """ mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") @@ -231,7 +231,7 @@ class AerotechAutomation1Test(unittest.TestCase): print("MR: Done testing MotorRecord!") - def test_07_AxisIo(self): + def test_06_AxisIo(self): # Throws if IOC is not available or PVs are incorrect dev = aa1AxisIo(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":IO:", name="io") dev.wait_for_connection() @@ -253,8 +253,7 @@ class AerotechAutomation1Test(unittest.TestCase): print("IO: Done testing IO module!") - - def test_08_AxisDdc(self): + def test_07_AxisDdc(self): """ Drive data collection This is the preliminary test case dor drive data collection using @@ -338,8 +337,7 @@ class AerotechAutomation1Test(unittest.TestCase): print("AX:DDCal(l: Done testing DDC module!") - - def test_09_AxisPsoDistance1(self): + def test_08_AxisPsoDistance1(self): """ Test basic PSO distance mode functionality This module tests basic PSO distance functionality, like events and @@ -357,7 +355,7 @@ class AerotechAutomation1Test(unittest.TestCase): if dev.output.value: dev.toggle() sleep(1) - + print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)") # Configure steps and move to middle of range dev.configure({'distance': 5, 'wmode': "toggle"}) @@ -422,7 +420,8 @@ class AerotechAutomation1Test(unittest.TestCase): dev.unstage() ddc.unstage() - def test_10_PsoIntegration01(self): + + def test_09_PsoIntegration01(self): """ The first integration test aims to verify PSO functionality under real life scenarios. It uses the DDC for collecting encoder signals and compares them to the expected values. @@ -497,7 +496,232 @@ class AerotechAutomation1Test(unittest.TestCase): ddc.unstage() + def test_10_AxisPsoWindow1(self): + """ Test basic PSO window mode functionality + + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisPsoWindow(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + dev.wait_for_connection() + ddc.wait_for_connection() + print("\nAX:PSO: Checking axis PSO in window mode (basic)") + if dev.output.value: + dev.toggle() + + print("AX:PSO: Directly checking basic window output (1/3)") + # Configure steps and move to middle of range + dev.configure({'bounds': (5, 13), 'wmode':"output"}) + dev.stage() + sleep(1) + # Move half a step + mot.move(mot.position + 0.1) + dist = 0.1 + + # Start moving in steps, while checking output + for ii in range(42): + mot.move(mot.position + 0.2) + dist += 0.2 + sleep(1) + if 5 < dist < 13: + self.assertTrue(dev.output.value==1, f"Expected HIGH PSO output inside the window at distance {dist}") + else: + self.assertTrue(dev.output.value==0, f"Expected LOW PSO output outside the window at distance {dist}") + + print("AX:PSO: The following tests whould fail as multiple triggers/events are being emmitted when entering/exiting the window!") + dev.unstage() + return + + print("AX:PSO: Checking basic window event generation with DDC (2/3)") + print(" WARN: The encoder output is very noisy, one transition generates several events") + # Move to acceleration position + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + print(f"Acceleration distance: {acc_dist}") + start_position = mot.position + mot.move(start_position - acc_dist) + # Configure PSO in window mode + cfg = {'bounds': (acc_dist, 30 + acc_dist), 'wmode':"pulsed", "emode": 'Both'} + dev.configure(cfg) + # Configure the data capture (must be performed in start position) + cfg = {'npoints': 5*10, 'trigger': DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + # Repeated line scan + for ii in range(10): + dev.stage() + mot.move(start_position + 30 + acc_dist) + dev.unstage() + mot.move(start_position - acc_dist) + sleep(0.5) + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-50)<1, f"Expected to record 10 points but got {npoints_rbv}") + + + print("AX:PSO: Checking basic 'toggled' event generation with DDC (3/3)") + print("Test removed as there's some heavy jitter on the PSO output flag") + # Move to acceleration position + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + print(f"Acceleration distance: {acc_dist}") + mot.move(mot.position - acc_dist) + # Configure PSO in window mode + cfg = {'bounds': (acc_dist, 30 + acc_dist + 1), 'wmode':"output"} + dev.configure(cfg) + dev.stage() + # Configure the data capture + cfg = {'npoints': 10, 'trigger': DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + # Snake scan + for ii in range(10): + if ii % 2 == 0: + mot.move(mot.position + 30+ 2*acc_dist + 0.1) + else: + mot.move(mot.position - 30 - 2*acc_dist - 0.1) + sleep(0.5) + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-10)<1, f"Expected to record 10 points but got {npoints_rbv}") + + + def test_12_TomcatSequenceScan(self): + """ Test the zig-zag sequence scan from Tomcat (via BEC) + + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + pso = aa1AxisPsoDistance(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + pso.wait_for_connection() + ddc.wait_for_connection() + t_start = time() + print("\nTOMCAT Sequeence scan (via Ophyd)") + + ScanStart = 42.0 + ScanRange = 180 + NumRepeat = 10 + ScanType = "PosNeg" + + # Dynamic properties + VelScan = 90 + AccScan = 500 + SafeDist = 10.0 + + ###################################################################### + print("\tConfigure") + # Scan range + AccDist = 0.5 * VelScan * VelScan / AccScan + SafeDist + if ScanType in ["PosNeg", "Pos"]: + PosStart = ScanStart - AccDist + PosEnd = ScanStart + ScanRange + AccDist + elif ScanType in ["NegPos", "Neg"]: + PosStart = ScanStart + AccDist + PosEnd = ScanStart - ScanRange - AccDist + else: + raise RuntimeError(f"Unexpected ScanType: {ScanType}") + + # Motor setup + mot.velocity.set(VelScan).wait() + mot.acceleration.set(VelScan/AccScan).wait() + mot.move(PosStart) + self.assertTrue(np.abs(mot.position-PosStart)<0.1, f"Expected to move to scan start position {PosStart}, found motor at {mot.position}") + + # PSO setup + print(f"Configuring PSO to: {[AccDist, ScanRange]}") + cfg = {'distance':[AccDist, ScanRange], 'wmode': "toggle"} + pso.configure(cfg) + + # DDC setup (Tomcat runs in ENABLE mode, so only one posedge) + cfg = {'npoints': NumRepeat, 'trigger': DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + + print("\tStage") + mot.stage() + pso.stage() + ddc.stage() + + for ii in range(NumRepeat): + # No option to reset the index counter... + pso.dstArrayRearm.set(1).wait() + + if ScanType in ["Pos", "Neg"]: + mot.move(PosEnd) + self.assertTrue(np.abs(mot.position-PosEnd)<1.0, f"Expected to reach {PosEnd}, motor is at {mot.position} on iteration {ii}") + mot.move(PosStart) + self.assertTrue(np.abs(mot.position-PosStart)<1.0, f"Expected to rewind to {PosStart}, motor is at {mot.position} on iteration {ii}") + + if ScanType in ["PosNeg", "NegPos"]: + if ii % 2 == 0: + mot.move(PosEnd) + self.assertTrue(np.abs(mot.position-PosEnd)<1.0, f"Expected to reach {PosEnd}, motor is at {mot.position} on iteration {ii}") + else: + mot.move(PosStart) + self.assertAlmostEqual(mot.position, PosStart, 1, f"Expected to reach {PosStart}, motor is at {mot.position} on iteration {ii}") + + sleep(0.2) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, ii+1, f"Expected to record {ii+1} DDC points, but got: {npoints_rbv} on iteration {ii}") + + print("\tUnstage") + mot.unstage() + ddc.unstage() + pso.unstage() + + t_end = time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") + + print("Evaluate final state") + sleep(3) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, NumRepeat, f"Expected to record {NumRepeat} DDC points, but got: {npoints_rbv}") + + + def test_14_JinjaTomcatSequenceScan(self): + # Load the test file + try: + filename = "test/testSequenceScanTemplate.ascript" + with open(filename) as f: + templatetext = f.read() + except FileNotFoundError: + filename = "testSequenceScanTemplate.ascript" + with open(filename) as f: + templatetext = f.read() + + import jinja2 + scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 10, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500} + tm = jinja2.Template(templatetext) + text = tm.render(scan=scanconfig) + # with open("test/tcatSequenceScan.ascript", "w") as text_file: + # text_file.write(text) + + # Throws if IOC is not available or PVs are incorrect + dev = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") + dev.wait_for_connection() + print("\nTOMCAT Sequeence scan (via Jinjad AeroScript)") + + # Copy file to controller and run it + t_start = time() + dev.launch_script("tcatSequenceScan.ascript", task_index=4, filetext=text) + sleep(0.5) + dev.complete().wait() + t_end = time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") + + + + if __name__ == "__main__": unittest.main() - - diff --git a/tomcat_bec/devices/aerotech/test/testSequenceScanTemplate.ascript b/tomcat_bec/devices/aerotech/test/testSequenceScanTemplate.ascript new file mode 100644 index 0000000..c34b9ee --- /dev/null +++ b/tomcat_bec/devices/aerotech/test/testSequenceScanTemplate.ascript @@ -0,0 +1,132 @@ +// Test program for zig-zag line scanning with PSO window output "enable" +// signal and DDC synchronized to external trigger input. +// The file expects external parameter validation +// The PSO locations arrays are set externally from EPICS PV + +enum ScanType + Pos = 0 + Neg = 1 + PosNeg = 2 + NegPos = 3 +end + + +program + ////////////////////////////////////////////////////////////////////////// + // External parameters - USE THEESE + var $fStartPosition as real = {{ scan.startpos }} + var $fScanRange as real = $fStartPosition + {{ scan.scanrange }} + var $iNumRepeat as integer = {{ scan.nrepeat }} + var $eScanType as ScanType = ScanType.{{ scan.scandir or 'Pos' }} + + var $fVelJog as real = {{ scan.jogvel or 200 }} + var $fVelScan as real = {{ scan.scanvel }} + var $fAcceleration = {{ scan.scanacc or 500 }} + var $fSafeDist = 10.0 + + ////////////////////////////////////////////////////////////////////////// + // Internal parameters - dont use + var $axis as axis = ROTY + var $ii as integer + var $iDdcSafeSpace as integer = 4096 + + // Set acceleration + SetupAxisRampType($axis, RampType.Linear) + SetupAxisRampValue($axis,0,$fAcceleration) + var $fAccDistance as real = 0.5 * $fVelScan * $fVelScan / $fAcceleration + $fSafeDist + + // set the actual scan range + var $fPosStart as real + var $fPosEnd as real + if $eScanType == ScanType.Pos + $fPosStart = $fStartPosition - $fAccDistance + $fPosEnd = $fStartPosition + $fScanRange + $fAccDistance + elseif $eScanType == ScanType.Neg + $fPosStart = $fStartPosition + $fAccDistance + $fPosEnd = $fStartPosition - $fScanRange - $fAccDistance + elseif $eScanType == ScanType.PosNeg + $fPosStart = $fStartPosition - $fAccDistance + $fPosEnd = $fStartPosition + $fScanRange + $fAccDistance + elseif $eScanType == ScanType.NegPos + $fPosStart = $fStartPosition + $fAccDistance + $fPosEnd = $fStartPosition - $fScanRange - $fAccDistance + end + + // Move to start position before the scan + MoveAbsolute($axis, $fPosStart, $fVelJog) + WaitForInPosition($axis) + + // Configure PSO + PsoDistanceCounterOff($axis) + PsoDistanceEventsOff($axis) + PsoWindowConfigureEvents($axis, PsoWindowEventMode.None) + PsoWaveformOff($axis) + PsoOutputConfigureSource($axis, PsoOutputSource.Waveform) + var $iPsoArrayAddr as integer = 0 + var $iPsoArray[] as real = [UnitsToCounts($axis, $fAccDistance), UnitsToCounts($axis, $fScanRange)] + DriveArrayWrite($axis, $iPsoArray, $iPsoArrayAddr, length($iPsoArray), DriveArrayType.PsoDistanceEventDistances) + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayAddr, 2, 0) + PsoDistanceEventsOn($axis) + + PsoWaveformConfigureMode($axis, PsoWaveformMode.Toggle) + PsoWaveformOn($axis) + + // Configure Drive Data Collection + var $iDdcArrayAddr as integer = 8388608 + var $iDdcArraySize as integer = 5000 + + DriveDataCaptureConfigureInput($axis, 0, DriveDataCaptureInput.PrimaryFeedback); + DriveDataCaptureConfigureInput($axis, 1, DriveDataCaptureInput.AnalogInput0 ); + + DriveDataCaptureConfigureTrigger($axis, 0, DriveDataCaptureTrigger.PsoOutput ); + DriveDataCaptureConfigureTrigger($axis, 1, DriveDataCaptureTrigger.PsoOutput ); + + DriveDataCaptureConfigureArray($axis, 0, $iDdcArrayAddr, $iDdcArraySize); + DriveDataCaptureConfigureArray($axis, 1, $iDdcArrayAddr + $iDdcSafeSpace + 8 * $iDdcArraySize, $iDdcArraySize); + + // Directly before scan + PsoDistanceCounterOn($axis) + DriveDataCaptureOn($axis, 0) + DriveDataCaptureOn($axis, 1) + + /////////////////////////////////////////////////////////// + // Start the actual scanning + /////////////////////////////////////////////////////////// + for $ii = 0 to ($iNumRepeat-1) + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayAddr, 2, 0) + + if $eScanType == ScanType.Pos + MoveAbsolute($axis, $fPosEnd, $fVelScan) + WaitForInPosition($axis) + MoveAbsolute($axis, $fPosStart, $fVelScan) + WaitForInPosition($axis) + elseif $eScanType == ScanType.Neg + MoveAbsolute($axis, $fPosEnd, $fVelScan) + WaitForInPosition($axis) + MoveAbsolute($axis, $fPosStart, $fVelScan) + WaitForInPosition($axis) + elseif $eScanType == ScanType.PosNeg + if ($ii % 2) == 0 + MoveAbsolute($axis, $fPosEnd, $fVelScan) + WaitForInPosition($axis) + elseif ($ii % 2) == 1 + MoveAbsolute($axis, $fPosStart, $fVelScan) + WaitForInPosition($axis) + end + elseif $eScanType == ScanType.NegPos + if ($ii % 2) == 0 + MoveAbsolute($axis, $fPosEnd, $fVelScan) + WaitForInPosition($axis) + elseif ($ii % 2) == 1 + MoveAbsolute($axis, $fPosStart, $fVelScan) + WaitForInPosition($axis) + end + end + Dwell(0.2) + end + + // Directly after scan + PsoDistanceCounterOff($axis) + DriveDataCaptureOff($axis, 0) + DriveDataCaptureOff($axis, 1) +end