diff --git a/tomcat_bec/devices/aerotech/AerotechAutomation1.py b/tomcat_bec/devices/aerotech/AerotechAutomation1.py index 8d549a1..aa303cf 100644 --- a/tomcat_bec/devices/aerotech/AerotechAutomation1.py +++ b/tomcat_bec/devices/aerotech/AerotechAutomation1.py @@ -35,21 +35,22 @@ class aa1Controller(Device): """Ophyd proxy class for the Aerotech Automation 1's core controller functionality""" # ToDo: Add error subscription - controllername = Component(EpicsSignalRO, "NAME", kind=Kind.config) - serialnumber = Component(EpicsSignalRO, "SN", kind=Kind.config) - apiversion = Component(EpicsSignalRO, "API_VERSION", kind=Kind.config) + controllername = Component(EpicsSignalRO, "NAME", string=True, kind=Kind.config) + serialnumber = Component(EpicsSignalRO, "SN", string=True, kind=Kind.config) + apiversion = Component(EpicsSignalRO, "API_VERSION", string=True, kind=Kind.config) axiscount = Component(EpicsSignalRO, "AXISCOUNT", kind=Kind.config) taskcount = Component(EpicsSignalRO, "TASKCOUNT", kind=Kind.config) fastpoll = Component(EpicsSignalRO, "POLLTIME", auto_monitor=True, kind=Kind.normal) slowpoll = Component(EpicsSignalRO, "DRVPOLLTIME", auto_monitor=True, kind=Kind.normal) errno = Component(EpicsSignalRO, "ERRNO", auto_monitor=True, kind=Kind.normal) - errnmsg = Component(EpicsSignalRO, "ERRMSG", auto_monitor=True, kind=Kind.normal) + errnmsg = Component(EpicsSignalRO, "ERRMSG", string=True, auto_monitor=True, kind=Kind.normal) _set_ismc = Component(EpicsSignal, "SET", put_complete=True, kind=Kind.omitted) USER_ACCESS = ["reset"] def reset(self): - """ Resets the Automation1 iSMC reloading the default configuration""" + """Resets the Automation1 iSMC reloading the default configuration. Note that this will + erase all settings that were set during startup and not saved to the MCD file.""" self._set_ismc.set(3).wait() diff --git a/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py b/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py index 3bfcc5b..38c6b73 100644 --- a/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py +++ b/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py @@ -152,9 +152,8 @@ class aa1AxisDriveDataCollection(PSIDeviceBase, Device): self.configure(d=d) # Stage the data collection if not in internally launced mode - # NOTE: Scripted scans start acquiring from the scrits - if "scan_type" in scan_args and scan_args["scan_type"] in ("scripted", "script"): - self.arm() + # NOTE: Scripted scans start acquiring from the scrits at kickoff + self.arm() # Reset readback self.reset() diff --git a/tomcat_bec/devices/aerotech/AerotechPso.py b/tomcat_bec/devices/aerotech/AerotechPso.py index 0349890..a43f7b0 100644 --- a/tomcat_bec/devices/aerotech/AerotechPso.py +++ b/tomcat_bec/devices/aerotech/AerotechPso.py @@ -39,6 +39,7 @@ class AerotechPsoBase(PSIDeviceBase, Device): output = Component(EpicsSignalRO, "OUTPUT-RBV", auto_monitor=True, kind=Kind.normal) address = Component(EpicsSignalRO, "ARRAY-ADDR", kind=Kind.config) _eventSingle = Component(EpicsSignal, "EVENT:SINGLE", put_complete=True, kind=Kind.omitted) + switch = Component(EpicsSignal, "SWITCH", put_complete=True, kind=Kind.omitted) # ######################################################################## # PSO Distance event module @@ -119,7 +120,6 @@ class AerotechPsoBase(PSIDeviceBase, Device): def fire(self, settle_time=0.1) -> None | DeviceStatus: """Fire a single PSO event (i.e. manual software trigger)""" # Only trigger if distance was set to invalid - logger.warning(f"[{self.name}] Triggerin...") self._eventSingle.set(1, settle_time=settle_time).wait() status = DeviceStatus(self) status.set_finished() @@ -139,6 +139,8 @@ class AerotechPsoBase(PSIDeviceBase, Device): w_pulse = d.get("pso_w_pulse", 200) n_pulse = d.get("pso_n_pulse", 1) + self.switch.set("Manual").wait() + # Configure the pulsed/toggled waveform if wmode in ["toggle", "toggled"]: # Switching to simple toggle mode @@ -161,12 +163,6 @@ class AerotechPsoBase(PSIDeviceBase, Device): else: raise RuntimeError(f"Unsupported window mode: {wmode}") - # Set PSO output data source - # FIXME : This is essentially staging... - if wmode in ["toggle", "toggled", "pulse", "pulsed"]: - self.outSource.set("Waveform").wait() - elif wmode in ["output", "flag"]: - self.outSource.set("Window").wait() class aa1AxisPsoDistance(AerotechPsoBase): @@ -227,10 +223,6 @@ class aa1AxisPsoDistance(AerotechPsoBase): raise RuntimeError(f"Unsupported distace triggering mode: {pso_wavemode}") old = self.read_configuration() - # Disable everything - self.winEvents.set("Off").wait() - self.dstCounterEna.set("Off").wait() - self.dstEventsEna.set("Off").wait() # Configure distance generator (also resets counter to 0) self._distance_value = pso_distance @@ -288,33 +280,25 @@ class aa1AxisPsoDistance(AerotechPsoBase): """Fire a single PSO event (i.e. manual software trigger)""" # Only trigger if distance was set to invalid # if self.dstDistanceVal.get() == 0: - logger.warning(f"[{self.name}] Triggerin...") + # logger.warning(f"[{self.name}] Triggerin...") return self.fire(settle_time) def arm(self) -> None: - """Bluesky style stage""" + """Bluesky style stage + + It re-arms the distance array and re-sets the distance counter at current position + """ # Stage the PSO distance module and zero counter if isinstance(self._distance_value, (np.ndarray, list, tuple)): self.dstArrayRearm.set(1).wait() - # Wait for polling - sleep(0.5) - # Start monitoring the counters if distance is valid - if self.dstDistanceVal.get() > 0: - self.dstEventsEna.set("On").wait() - self.dstCounterEna.set("On").wait() + # Set distance and wait for polling + self.switch.set("Distance", settle_time=0.2).wait() def disarm(self): """Standard bluesky unstage""" - # Ensure output is set to low - # if self.output.value: - # self.toggle() - # Turn off window mode - self.winOutput.set("Off").wait() - self.winEvents.set("Off").wait() - # Turn off distance mode - self.dstEventsEna.set("Off").wait() - self.dstCounterEna.set("Off").wait() - # Disable output - self.outSource.set("None").wait() - # Sleep for one poll period - sleep(0.2) + self.switch.set("Manual", settle_time=0.2).wait() + + def launch(self): + """Re-set the counters""" + if isinstance(self._distance_value, (np.ndarray, list, tuple)): + self.dstArrayRearm.set(1).wait() \ No newline at end of file diff --git a/tomcat_bec/devices/aerotech/AerotechSimpleSequenceTemplate.ascript b/tomcat_bec/devices/aerotech/AerotechSimpleSequenceTemplate.ascript index b7b0c21..5159e72 100644 --- a/tomcat_bec/devices/aerotech/AerotechSimpleSequenceTemplate.ascript +++ b/tomcat_bec/devices/aerotech/AerotechSimpleSequenceTemplate.ascript @@ -33,7 +33,7 @@ program ////////////////////////////////////////////////////////////////////////// // Internal parameters - dont use - var $axis as axis = ROTY + var $axis as axis = {{ scan.rotaxis or 'ROTY' }} var $ii as integer var $axisFaults as integer = 0 var $iDdcSafeSpace as integer = 4096 diff --git a/tomcat_bec/devices/aerotech/AerotechSnapAndStepTemplate.ascript b/tomcat_bec/devices/aerotech/AerotechSnapAndStepTemplate.ascript index 80657d0..b308436 100644 --- a/tomcat_bec/devices/aerotech/AerotechSnapAndStepTemplate.ascript +++ b/tomcat_bec/devices/aerotech/AerotechSnapAndStepTemplate.ascript @@ -20,7 +20,7 @@ program // Internal - var $axis as axis = ROTY + var $axis as axis = {{ scan.rotaxis or 'ROTY' }} var $ii as integer var $axisFaults as integer = 0 diff --git a/tomcat_bec/devices/aerotech/AerotechTasks.py b/tomcat_bec/devices/aerotech/AerotechTasks.py index a29c4e0..93ad94d 100644 --- a/tomcat_bec/devices/aerotech/AerotechTasks.py +++ b/tomcat_bec/devices/aerotech/AerotechTasks.py @@ -5,6 +5,7 @@ interface. @author: mohacsi_i """ +import time from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind from ophyd.status import SubscriptionStatus @@ -106,12 +107,8 @@ class aa1Tasks(PSIDeviceBase, Device): self.fileName.set(d["script_file"]).wait() if "script_text" in d: # Compile text for syntax checking - # NOTE: This will load to 'script_file' + # NOTE: This will overwrite 'script_file' self._fileWrite.set(d["script_text"], settle_time=0.2).wait() - self.switch.set("Load").wait() - # Check the result of load - if self._failure.value: - raise RuntimeError("Failed to load script, perhaps a syntax error?") new = self.read_configuration() return (old, new) @@ -169,8 +166,22 @@ class aa1Tasks(PSIDeviceBase, Device): """Bluesky style stage, prepare, but does not execute""" if self.taskIndex.get() in (0, 1, 2): logger.error(f"[{self.name}] Loading AeroScript on system task. Daring today are we?") + if len(self.fileName.get())==0: + self.fileName.set("bec.ascript", settle_time=0.1).wait() + + self._failure.read() + ts_old = float(self._failure.timestamp) + def wait_until_new(*_, old_value, value, timestamp, **__): + nonlocal ts_old + result = bool(ts_old != timestamp) and (value!=-1) + # print(f"{old_value}\t{value}\t{ts_old}\t{timestamp}\t{result}") + return result + + # Subscribe and wait for update + status = SubscriptionStatus(self._failure, wait_until_new, settle_time=0.1) + # Load and check success - status = self.switch.set("Load", settle_time=0.2) + self.switch.set("Load", settle_time=0.0).wait() status.wait() if self._failure.value: raise RuntimeError("Failed to load task, please check the Aerotech IOC") @@ -191,11 +202,14 @@ class aa1Tasks(PSIDeviceBase, Device): def complete(self) -> SubscriptionStatus: """Wait for a RUNNING task""" + # Sleep for foll period + time.sleep(1) + timestamp_ = 0 task_idx = int(self.taskIndex.get()) - def not_running(*, value, timestamp, **_): nonlocal timestamp_ + # print(f"State {value[task_idx]} in {value}") result = value[task_idx] not in ["Running", 4] timestamp_ = timestamp return result diff --git a/tomcat_bec/devices/aerotech/__init__.py b/tomcat_bec/devices/aerotech/__init__.py index 5fe62b6..1b56826 100644 --- a/tomcat_bec/devices/aerotech/__init__.py +++ b/tomcat_bec/devices/aerotech/__init__.py @@ -7,3 +7,7 @@ from .AerotechAutomation1 import ( aa1GlobalVariableBindings, aa1AxisIo, ) +from .AerotechAutomation1Enums import ( + DriveDataCaptureInput, + DriveDataCaptureTrigger, +) \ No newline at end of file diff --git a/tomcat_bec/devices/aerotech/test/autoHilTest.py b/tomcat_bec/devices/aerotech/test/autoHilTest.py index d3b681e..8293277 100644 --- a/tomcat_bec/devices/aerotech/test/autoHilTest.py +++ b/tomcat_bec/devices/aerotech/test/autoHilTest.py @@ -11,31 +11,45 @@ Created on Wed Sep 04 13:21:33 2024 @author: mohacsi_i """ - -import os -import numpy as np import unittest -from time import sleep, time -from ophyd import EpicsMotor, EpicsSignal - -os.environ["EPICS_CA_ADDR_LIST"] = "129.129.106.255" +import time +import os +import jinja2 +import numpy as np +from ophyd import Component, EpicsMotor, EpicsSignal, EpicsSignalRO -from tomcat_bec.devices.aerotech.AerotechAutomation1 import ( - aa1Controller, aa1Tasks, aa1GlobalVariables, aa1AxisIo, aa1AxisDriveDataCollection, - aa1GlobalVariableBindings, aa1AxisPsoDistance, aa1AxisPsoWindow - ) - -from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import ( - DriveDataCaptureInput, DriveDataCaptureTrigger - ) +from tomcat_bec.devices.aerotech import ( + aa1Controller, + aa1GlobalVariables, + aa1GlobalVariableBindings, + aa1AxisIo, + aa1Tasks, + aa1AxisDriveDataCollection, + aa1AxisPsoDistance, + DriveDataCaptureInput, + DriveDataCaptureTrigger, +) +os.environ["EPICS_CA_ADDR_LIST"] = "129.129.106.255 129.129.99.255" AA1_IOC_NAME = "X02DA-ES1-SMP1" -AA1_AXIS_NAME = "ROTY" +AA1_IOC_AXIS = "ROTY" +AA1_DRV_AXIS = "RY" +AXIS_VELOCITY = 42 +AXIS_TRAVEL = 80 +AXIS_TOL = 1.0 + +class AerotechMotor(EpicsMotor): + """ EpicsMotor extension with additional PVs""" + motor_enable_rbv = Component(EpicsSignalRO, "-EnaAct", kind="omitted", auto_monitor=True) + motor_enable = Component(EpicsSignal, ".CNEN", kind="omitted", auto_monitor=True) +# pylint: disable=protected-access class AerotechAutomation1Test(unittest.TestCase): + """Main test class""" + _isConnected = False @classmethod @@ -43,9 +57,13 @@ class AerotechAutomation1Test(unittest.TestCase): # HIL testing only if there's hardware dev = aa1Controller(AA1_IOC_NAME + ":CTRL:", name="aa1") dev.wait_for_connection() + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_IOC_AXIS, name="mot") + mot.wait_for_connection() + mot.motor_enable.set(1, settle_time=0.1).wait() cls._isConnected = True def test_01_ControllerProxy(self): + """Test the basic iSMC client""" # Throws if IOC is not available or PVs are incorrect dev = aa1Controller(AA1_IOC_NAME + ":CTRL:", name="aa1") dev.wait_for_connection() @@ -58,117 +76,136 @@ class AerotechAutomation1Test(unittest.TestCase): self.assertTrue(dev.taskcount.get() > 0, "Maximum controller task count seems invalid") print("CTRL: Done testing CTRL module!") - def test_02_TaskProxy(self): - """ Testing the TASK interface - """ + def test_02_task_proxy(self): + """Testing the TASK interface""" # Throws if IOC is not available or PVs are incorrect dev = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") dev.wait_for_connection() - print("\nTASK: Testing the 'execute' command (direct string execution)") + print("TASK: Testing the 'run' command (run text with intermediary file)") + # Send another empty program + filename = "test_program.ascript" + text = "program\nvar $positions[4] as real = [2, 4, 8, 16]\nend" + dev.configure({'script_file': filename, 'script_task': 3, 'script_text': text}) + dev.arm() + dev.kickoff() + dev.complete().wait() + dev.unstage() - # Attempt to run a short test script using the Execute interface (real) - text = "$rreturn[0]=3.141592" - rbv = dev.execute(text, mode="Double", taskindex=3) - - self.assertTrue(dev._executeMode.get() in ["Double", 3], - f"Expecting double execution return type, got: {dev._executeMode.get()}") - self.assertTrue(dev.taskIndex.get() == 3, "Execution must happen on task 3") - self.assertTrue(rbv == "Result (double): 3.141592", - f"Unexpected reply from execution: {rbv}") - - # Attempt to run a short test script using the Execute interface (integer) - text = "$ireturn[0]=42" - rbv = dev.execute(text, mode="Int", taskindex=4) - self.assertTrue(dev.taskIndex.get() == 4, "Execution must happen on task 1") - self.assertTrue(rbv == "Result (int): 42", f"Unexpected reply from execution: {rbv}") - - # Attempt a non-existent function on the execute interface - print("TASK: Testing a simple motion command execution") - text = """ - var $fDist as real = 90 - var $fVelo as real = 20 - var $axis as axis = ROTY - MoveRelative($axis, $fDist, $fVelo) - """ + print("TASK: Testing exception on syntax error") + # Send another empty program with 'war' instead of 'var' + text = "program\nvar $positions[2] as roll\neeeend" + dev.configure({'script_task': 3, 'script_text': text}) + # Should raise when Load with self.assertRaises(RuntimeError): - rbv = dev.execute(text, taskindex=4) + dev.arm() + dev.unstage() - # Attempt to run a short motion command using the Execute interface + # # Run a sleep function on the iSMC + print("TASK: Testing a blocking command execution") text = """ + $rglobal[4]=1.234 + var $tSleepSec as real = 5 + Dwell($tSleepSec) + $rglobal[4]=8.765 + """ + dev.configure({'script_task': 3, 'script_text': text}) + dev.arm() + t_start = time.time() + dev.launch() + dev.complete().wait() + t_end = time.time() + t_elapsed = t_end - t_start + self.assertTrue( 4 < t_elapsed < 9, f"A 5 seconds sleep finished in: {t_elapsed}") + dev.unstage() + + # # Attempt to run a short motion command using the Execute interface + print("TASK: Testing a short motion command") + text = f""" var $fDist as real = 90 - var $fVelo as real = 20 - var $axis as axis = ROTY + var $fVelo as real = 60 + var $axis as axis = {AA1_DRV_AXIS} Enable($axis) MoveLinear($axis, $fDist, $fVelo) """ - t_start = time() - rbv = dev.execute(text, taskindex=4) - t_end = time() - t_elapsed = t_end-t_start - self.assertTrue(t_elapsed > 4, f"Expected 4+ seconds for motion, took {t_elapsed}") - - print("TASK: Testing the 'run' command (run text with intermediary file)") - # Send another empty program - filename = "unittesting2.ascript" - text = "program\nvar $positions[2] as real\nend" - dev.launch_script(filename, taskindex=3, filetext=text) + dev.configure({'script_task': 3, 'script_text': text, 'script_file': "test_shortmove.ascript"}) + dev.arm() + t_start = time.time() + dev.kickoff() dev.complete().wait() + t_end = time.time() + t_elapsed = t_end - t_start + self.assertTrue( 1 < t_elapsed < 4, f"Motion finished outside of window in: {t_elapsed}") + dev.unstage() + print("TASK: Done testing TASK module!") - def test_03_GlobalVariables1(self): - """Test the basic read/write global variable interface - """ + def test_03_global_variables(self): + """Test the basic read/write global variable interface""" # Throws if IOC is not available or PVs are incorrect dev = aa1GlobalVariables(AA1_IOC_NAME + ":VAR:", name="gvar") dev.wait_for_connection() print("\nVAR: Checking available memory") - self.assertTrue(dev.num_int.get() >= 32, - "At least 32 integer variables are needed for this test") - self.assertTrue(dev.num_real.get() >= 32, - "At least 32 real variables are needed for this test") - self.assertTrue(dev.num_string.get() >= 32, - "At least 32 string[256] variables are needed for this test") + self.assertTrue( + dev.num_int.get() >= 32, "At least 32 integer variables are needed for this test" + ) + self.assertTrue( + dev.num_real.get() >= 32, "At least 32 real variables are needed for this test" + ) + self.assertTrue( + dev.num_string.get() >= 8, "At least 8 string[256] variables are needed for this test" + ) print("VAR: Setting and checking a few variables") # Use random variables for subsequent runs val_f = 100 * np.random.random() dev.write_float(11, val_f) rb_f = dev.read_float(11) - self.assertEqual(val_f, rb_f, - f"Floating point readback value changed, read {rb_f} instead of {val_f}") + self.assertEqual( + val_f, rb_f, f"Floating point readback value changed, read {rb_f} instead of {val_f}" + ) rb_f = dev.read_float(10, 4) - self.assertEqual(val_f, rb_f[1], - f"FP64 array readback value changed, read {rb_f[1]} instead of {val_f}") + self.assertEqual( + val_f, rb_f[1], f"FP64 array readback value changed, read {rb_f[1]} instead of {val_f}" + ) - val_i = 1+int(100*np.random.random()) + val_i = 1 + int(100 * np.random.random()) dev.write_int(13, val_i) rb_i = dev.read_int(13) - self.assertEqual(val_i, rb_i, - f"Integer readback value changed, read {rb_i} instead of {val_i}") + self.assertEqual( + val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" + ) rb_i = dev.read_int(10, 4) - self.assertEqual(val_i, rb_i[3], - f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}") + self.assertEqual( + val_i, + rb_i[3], + f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", + ) - val_i = 1 + int(100*np.random.random()) + val_i = 1 + int(100 * np.random.random()) dev.write_int(13, val_i) rb_i = dev.read_int(13) - self.assertEqual(val_i, rb_i, - f"Integer readback value changed, read {rb_i} instead of {val_i}") + self.assertEqual( + val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}" + ) rb_i = dev.read_int(10, 4) - self.assertEqual(val_i, rb_i[3], - f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}") + self.assertEqual( + val_i, + rb_i[3], + f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}", + ) - val_s = np.random.choice(['f', 'o', 'o', 'b', 'a', 'r'], 6) - val_s = ''.join(val_s) - dev.write_string(7, val_s) - rb_s = dev.read_string(7) - self.assertEqual(val_s, rb_s, - f"String readback value changed, read {rb_s} instead of {val_s}") + val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) + val_s = "".join(val_s) + dev.write_string(3, val_s) + rb_s = dev.read_string(3) + self.assertEqual( + val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" + ) print("VAR: Done testing VAR module basic functionality!") - def test_04_GlobalVariables2(self): + def test_04_global_bindings(self): """Test the direct global variable bindings together with the task and standard global variable interfaces. """ @@ -182,664 +219,699 @@ class AerotechAutomation1Test(unittest.TestCase): print("\nVAR: Checking the direct global variable bindings with random numbers") # Check reading/writing float variables - val_f = 100*np.random.random() + val_f = 100 * np.random.random() dev.float16.set(val_f, settle_time=0.5).wait() - self.assertEqual(val_f, dev.float16.value, - f"Unexpected floating readback, read {dev.float16.value}, expected {val_f}") + self.assertEqual( + val_f, + dev.float16.value, + f"Unexpected floating readback, read {dev.float16.value}, expected {val_f}", + ) - val_f = 100*np.random.random() - var.write_float(3, val_f, settle_time=0.5) - self.assertEqual(val_f, dev.float3.value, - f"Unexpected floating readback, read {dev.float3.value}, expected {val_f}") + val_f = 100 * np.random.random() + var.write_float(3, val_f, settle_time=1.5) + self.assertEqual( + val_f, + dev.float3.value, + f"Unexpected floating readback, read {dev.float3.value}, expected {val_f}", + ) # Check reading/writing integer variables - val_i = int(100*np.random.random())-50 - var.write_int(3, val_i, settle_time=0.5) - self.assertEqual(val_i, dev.int3.value, - f"Unexpected integer readback, read {dev.int3.value}, expected {val_i}") + val_i = int(100 * np.random.random()) - 50 + var.write_int(3, val_i, settle_time=1.5) + self.assertEqual( + val_i, + dev.int3.value, + f"Unexpected integer readback, read {dev.int3.value}, expected {val_i}", + ) - val_i = 1+int(100*np.random.random()) + val_i = 1 + int(100 * np.random.random()) dev.int8.set(val_i, settle_time=0.5).wait() - self.assertEqual(val_i, dev.int8.value, - f"Unexpected integer readback, read {dev.int8.value}, expected {val_i}") + self.assertEqual( + val_i, + dev.int8.value, + f"Unexpected integer readback, read {dev.int8.value}, expected {val_i}", + ) # Check reading/writing string variables - val_s = np.random.choice(['f', 'o', 'o', 'b', 'a', 'r'], 6) - val_s = ''.join(val_s) + val_s = np.random.choice(["f", "o", "o", "b", "a", "r"], 6) + val_s = "".join(val_s) dev.str4.set(val_s, settle_time=0.5).wait() rb_s = dev.str4.value - self.assertEqual(val_s, rb_s, - f"String readback value changed, read {rb_s} instead of {val_s}") + self.assertEqual( + val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}" + ) print("VAR: Done testing VAR module direct bindings!") - # Attempt to run a short test script using the Execute interface (integer) - text = """ - $iglobal[1]=420 - $rglobal[1]=1.4142 + # Attempt to run a short test script in globals (integer) + val_i = int(1000*np.random.rand()) + val_f = 1000*(np.random.rand()-0.5) + text = f""" + $iglobal[1]={val_i} + $rglobal[1]={val_f} """ - tsk.execute(text, taskindex=4) - self.assertEqual(420, dev.int1.value, - f"Unexpected integer readback , read {dev.int1.value}, expected 420") - self.assertAlmostEqual(1.4142, dev.float1.value, - f"Unexpected floating readback , read {dev.float1.value}, expected 1.4142") + tsk.configure({'script_text': text, 'script_task': 4, 'script_file': "test_global.ascript"}) + tsk.arm() + tsk.launch() + tsk.complete().wait() + self.assertEqual( + val_i, + dev.int1.value, + f"Unexpected integer readback , read {dev.int1.value}, expected {val_i}", + ) + self.assertAlmostEqual( + val_f, + dev.float1.value, + f"Unexpected floating readback , read {dev.float1.value}, expected {val_f}", + ) - def test_05_MotorRecord(self): - """ Tests the basic move functionality of the MotorRecord - """ - mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + def test_05_axis_motorrecord(self): + """Tests the basic move functionality of the MotorRecord""" + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_IOC_AXIS, name="x") mot.wait_for_connection() + mot.motor_enable.set(1, settle_time=0.1).wait() + mot.velocity.set(AXIS_VELOCITY).wait() print("\nMR: Checking the standard EPICS motor record") + # Test enabling/disabling the axis + mot.motor_enable.set(1, settle_time=0.1).wait() + time.sleep(1) + self.assertTrue(mot.motor_enable_rbv.value, "Motor should be enabled") + mot.motor_enable.set(0, settle_time=0.1).wait() + time.sleep(1) + self.assertFalse(mot.motor_enable_rbv.value, "Motor should be disabled") + mot.motor_enable.set(1, settle_time=0.1).wait() + time.sleep(1) + self.assertTrue(mot.motor_enable_rbv.value, "Motor should be enabled") + # Four test with absolute moves - def RelMove(dist): + def move_rel(dist): target = mot.position + dist print(f"Absolute move to {target}, distance: {dist}") mot.move(target, wait=True) - self.assertTrue(np.abs(target-mot.position) < 0.1, - f"Final position {mot.position} is too far from target {target}") + self.assertTrue( + np.abs(target - mot.position) < AXIS_TOL, + f"Final position {mot.position} is too far from target {target}", + ) - RelMove(30 + 50.0 * np.random.random()) - RelMove(20 + 50.0 * np.random.random()) - RelMove(-10 + 50.0 * np.random.random()) - RelMove(-40 - 50.0 * np.random.random()) - RelMove(10 + 50.0 * np.random.random()) + move_rel(30 + 50.0 * np.random.random()) + move_rel(20 + 50.0 * np.random.random()) + move_rel(-10 + 50.0 * np.random.random()) + move_rel(-40 - 50.0 * np.random.random()) + move_rel(10 + 50.0 * np.random.random()) print("MR: Done testing MotorRecord!") - def test_06_AxisIo(self): + def test_06_axis_io(self): + """Test axis IO bindings""" # Throws if IOC is not available or PVs are incorrect - dev = aa1AxisIo(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":IO:", name="io") + dev = aa1AxisIo(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":IO:", name="io") dev.wait_for_connection() - print("\nAX:IO: Checking axis IO") # Writing random values to analog/digital outputs - for ii in range(5): + print("\nAX:IO: Checking axis analog IO") + for _ in range(5): val_f = np.random.random() dev.set_analog(0, val_f) - self.assertAlmostEqual(dev.ao.get(), val_f, 2, "Unexpected readback on analog output") + self.assertAlmostEqual(dev.ao.get(), val_f, 2, "Unexpected setpoint on analog output") self.assertAlmostEqual(dev.ao0.get(), val_f, 2, "Unexpected readback on analog output") - print("Digital") - for ii in range(5): + print("\nAX:IO: Checking axis digital IO") + for _ in range(5): val_b = round(np.random.random()) dev.set_digital(0, val_b) - self.assertEqual(dev.do.get(), val_b, "Unexpected readback on digital output") + self.assertEqual(dev.do.get(), val_b, "Unexpected setpoint on digital output") self.assertEqual(dev.do0.get(), val_b, "Unexpected readback on digital output") print("IO: Done testing IO module!") - def test_07_AxisDdc(self): - """ Drive data collection + def test_07_axis_ddc(self): + """Drive data collection This is the preliminary test case dor drive data collection using internal PSO based triggers. It tests basic functionality, like configuration, state monitoring and readback. """ # Throws if IOC is not available or PVs are incorrect - dev = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - event = EpicsSignal(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:EVENT:SINGLE", put_complete=True) + dev = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":DDC:", name="ddc") + event = EpicsSignal( + AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":PSO:EVENT:SINGLE", put_complete=True + ) dev.wait_for_connection() event.wait_for_connection() print("\nAX:DDC: Checking axis DDC") # Stop any running acquisition - dev.unstage(settle_time=0.1) + dev.unstage() print("AX:DDC: Running a partial acquisition (1/3)") # Configure the DDC for 42 points and start waiting for triggers - cfg = {'ntotal': 42, 'trigger': DriveDataCaptureTrigger.PsoEvent} + cfg = {"num_points_total": 42, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} dev.configure(cfg) - dev.stage() + dev.arm() # Fire 12 triggers and stop the acquisition early - for ii in range(12): - event.set(1, settle_time=0.05).wait() + for _ in range(12): + event.set(1, settle_time=0.1).wait() # Stop the data acquisition early - dev.unstage() + dev.disarm() # Wait for polling to catch up - sleep(1) + time.sleep(1) # Do the readback and check final state rbv = dev.collect() - self.assertEqual(dev.nsamples_rbv.value, 12, - f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}") - self.assertEqual(len(rbv['data']["buffer0"]), 12, - f"Expected to read back 12 points by now, got {len(rbv['data']['buffer0'])}") - self.assertEqual(len(rbv['data']["buffer1"]), 12, - f"Expected to read back 12 points by now, got {len(rbv['data']['buffer1'])}") + self.assertEqual( + dev.nsamples_rbv.value, + 12, + f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 12, + f"Expected to read back 12 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 12, + f"Expected to read back 12 points by now, got {len(rbv['data']['buffer1'])}", + ) print("AX:DDC: Running an overruning acquisition (2/3)") # Configure the DDC for 16 points and start waiting for triggers - cfg = {'ntotal': 16, 'trigger': DriveDataCaptureTrigger.PsoEvent} + cfg = {"num_points_total": 16, "ddc_trigger": DriveDataCaptureTrigger.PsoEvent} dev.configure(cfg) - dev.stage() + dev.arm() num = dev.nsamples_rbv.get() - self.assertEqual(num, 0, - f"Stage should reset the number of DDC points but got: {num}") + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") # Fire 20 triggers and stop the acquisition early - for ii in range(20): - event.set(1, settle_time=0.05).wait() - dev.unstage() + for _ in range(20): + event.set(1, settle_time=0.05).wait() + dev.disarm() # Wait for polling to catch up - sleep(1) + time.sleep(1) # Do the readback and check final state rbv = dev.collect() - self.assertEqual(dev.nsamples_rbv.value, 16, - f"Expected to collect 16 points by now, got {dev.nsamples_rbv.value}") - self.assertEqual(len(rbv['data']["buffer0"]), 16, - f"Expected to read back 16 points by now, got {len(rbv['data']['buffer0'])}") - self.assertEqual(len(rbv['data']["buffer1"]), 16, - f"Expected to read back 16 points by now, got {len(rbv['data']['buffer1'])}") + self.assertEqual( + dev.nsamples_rbv.value, + 16, + f"Expected to collect 16 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 16, + f"Expected to read back 16 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 16, + f"Expected to read back 16 points by now, got {len(rbv['data']['buffer1'])}", + ) print("AX:DDC: Trying to record some noise on the analog input (3/3)") - cfg = {'ntotal': 36, 'trigger': DriveDataCaptureTrigger.PsoEvent, 'source1': DriveDataCaptureInput.AnalogInput0} + cfg = { + "num_points_total": 36, + "ddc_trigger": DriveDataCaptureTrigger.PsoEvent, + "ddc_source1": DriveDataCaptureInput.AnalogInput0, + } dev.configure(cfg) - dev.stage() + dev.arm() num = dev.nsamples_rbv.get() - self.assertEqual(num, 0, - f"Stage should reset the number of DDC points but got: {num}") + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") # Fire 36 triggers - for ii in range(36): - event.set(1, settle_time=0.05).wait() - dev.unstage() + for _ in range(36): + event.set(1, settle_time=0.05).wait() + dev.disarm() # Wait for polling to catch up - sleep(1) + time.sleep(1) # Do the readback and check final state rbv = dev.collect() - self.assertEqual(dev.nsamples_rbv.value, 36, - f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}") - self.assertEqual(len(rbv['data']["buffer0"]), 36, - f"Expected to read back 36 points by now, got {len(rbv['data']['buffer0'])}") - self.assertEqual(len(rbv['data']["buffer1"]), 36, - f"Expected to read back 36 points by now, got {len(rbv['data']['buffer1'])}") - self.assertTrue(np.std(rbv['data']["buffer1"]) > 0.000001, - f"Expected some noise on analog input, but std is: {np.std(rbv['data']['buffer1'])}") + self.assertEqual( + dev.nsamples_rbv.value, + 36, + f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}", + ) + self.assertEqual( + len(rbv["data"]["buffer0"]), + 36, + f"Expected to read back 36 points by now, got {len(rbv['data']['buffer0'])}", + ) + self.assertEqual( + len(rbv["data"]["buffer1"]), + 36, + f"Expected to read back 36 points by now, got {len(rbv['data']['buffer1'])}", + ) + self.assertTrue( + np.std(rbv["data"]["buffer1"]) > 0.000001, + f"Expected some noise on analog input, but std is: {np.std(rbv['data']['buffer1'])}", + ) print("AX:DDCal(l: Done testing DDC module!") - def test_08_AxisPsoDistance1(self): - """ Test basic PSO distance mode functionality + def test_08_axis_pso_distance1(self): + """Test basic PSO distance mode functionality - This module tests basic PSO distance functionality, like events and - waveform generation. The goal is to ensure the basic operation of - the PSO module and the polled feedback. + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. """ # Throws if IOC is not available or PVs are incorrect - pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_IOC_AXIS, name=AA1_IOC_AXIS.lower()) mot.wait_for_connection() pso.wait_for_connection() ddc.wait_for_connection() - pso.unstage() - ddc.unstage() + pso.disarm() + ddc.disarm() mot.unstage() print("\nAX:PSO: Checking axis PSO in distance mode (basic)") print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)") # Configure steps and move to middle of range - pso.configure({'distance': 5, 'wmode': "toggle"}) - pso.kickoff() - mot.move(mot.position + 2.5) + pso.configure({"pso_distance": 5, "pso_wavemode": "toggle"}) + pso.arm() + mot.velocity.set(AXIS_VELOCITY).wait() + mot.move(mot.position + 2.5, wait=True) + + ddc.arm() + pso.arm() # Start moving in steps, while checking output for ii in range(10): last_pso = pso.output.value - mot.move(mot.position + 5) - sleep(1) - self.assertTrue(pso.output.value != last_pso, - f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}") - pso.unstage() - ddc.unstage() + mot.move(mot.position + 5, wait=True) + time.sleep(1) + self.assertTrue( + pso.output.value != last_pso, + f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}", + ) + pso.disarm() + ddc.disarm() print("AX:PSO: Checking basic 'toggled' event generation with DDC (2/3)") # Configure steps and move to middle of range - pso.configure({'distance': 2, 'wmode': "toggle"}) - pso.kickoff() + pso.configure({"pso_distance": 2, "pso_wavemode": "toggle"}) + pso.arm() # Configure DDC npoints = 720 / 2.0 - cfg = {'ntotal': npoints+100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": npoints + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) - ddc.stage() + ddc.arm() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling - mot.move(mot.position + 720) - sleep(0.5) + mot.move(mot.position + 720, wait=True) + time.sleep(1) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-720/4) <= 1, - f"Expected to record {720/4} points but got {npoints_rbv}") - pso.unstage() - ddc.unstage() + self.assertTrue( + np.abs(npoints_rbv - 720 / 4) <= 1, + f"Expected to record {720/4} points but got {npoints_rbv}", + ) + pso.disarm() + ddc.disarm() - print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/5)") + print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/3)") # Configure steps and move to middle of range - pso.configure({'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 3}) - pso.kickoff() + pso.configure({"pso_distance": 1.8, "pso_wavemode": "pulsed", "pso_n_pulse": 3}) + pso.arm() # Configure DDC npoints = 3 * 720 / 1.8 - cfg = {'ntotal': npoints+100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": (npoints) + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) - ddc.stage() + ddc.arm() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling - mot.move(mot.position + 720) - sleep(0.5) + mot.move(mot.position + 720 + 0.9, wait=True) + time.sleep(1) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-3*720/1.8) <= 1, - f"Expected to record {3*720/1.8} points but got {npoints_rbv}") + self.assertTrue( + np.abs(npoints_rbv - 3 * 720 / 1.8) <= 1, + f"Expected to record {3*720/1.8} points but got {npoints_rbv}", + ) print("AX:PSO: Done testing basic PSO distance functionality!") - pso.unstage() - ddc.unstage() + pso.disarm() + ddc.disarm() - def test_09_PsoIntegration01(self): - """ The first integration test aims to verify PSO functionality under - real life scenarios. It uses the DDC for collecting encoder signals - and compares them to the expected values. + def test_09_axis_pso_integration_1(self): + """The first integration test aims to verify PSO functionality under + real life scenarios. It uses the DDC for collecting encoder signals + and compares them to the expected values. """ # Throws if IOC is not available or PVs are incorrect - pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_IOC_AXIS, name="x") mot.wait_for_connection() pso.wait_for_connection() ddc.wait_for_connection() pso.unstage() ddc.unstage() mot.unstage() - print("\ntest_09_PsoIntegration01: Integration test for vector style PSO triggering (advanced)") + print( + "\ntest_09_PsoIntegration01: Integration test for vector PSO triggering (advanced)" + ) - print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/3)") + print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/2)") # This is one line of the Tomcat sequence scan! acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value vec_dist = [acc_dist, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18] - pso.configure({'distance': vec_dist, 'wmode': "toggle"}) - pso.kickoff() + pso.configure({"pso_distance": vec_dist, "pso_wavemode": "toggle"}) + pso.arm() # Configure DDC npoints_exp = len(vec_dist) / 2 - cfg = {'ntotal': npoints_exp + 100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) - ddc.stage() + ddc.arm() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling mot.move(mot.position + np.sum(vec_dist) + 10) - sleep(0.9) + time.sleep(1) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1, - f"Expected to record {npoints_exp} points but got {npoints_rbv}") - self.assertTrue(pso.output.value == 0, - "Expected to finish in LOW state") - self.assertTrue(pso.dstArrayDepleted.value, - "Expected to cover all points in the distance array") + self.assertTrue( + np.abs(npoints_rbv - npoints_exp) < 1, + f"Expected to record {npoints_exp} points but got {npoints_rbv}", + ) + self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") + self.assertTrue( + pso.dstArrayDepleted.value, "Expected to cover all points in the distance array" + ) pso.unstage() ddc.unstage() - print("AX:PSO: Checking 'individual' style trigger signal with DDC (2/5)") + print("AX:PSO: Checking 'individual' style trigger signal with DDC (2/2)") # This is practically golden ratio tomography! acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value vec_dist = [acc_dist] - vec_dist.extend([1.618]*100) - pso.configure({'distance': vec_dist, 'wmode': "pulsed"}) - pso.kickoff() + vec_dist.extend([1.618] * 100) + pso.configure({"pso_distance": vec_dist, "pso_wavemode": "pulsed"}) + pso.arm() # Configure DDC npoints_exp = len(vec_dist) - cfg = {'ntotal': npoints_exp + 100, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": npoints_exp + 100, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) - ddc.stage() + ddc.arm() npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 0, + f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling (move negative direction) mot.move(mot.position - np.sum(vec_dist) - acc_dist) - sleep(0.9) + time.sleep(1) # Evaluate results npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1, - f"Expected to record {npoints_exp} points but got {npoints_rbv}") + self.assertTrue( + np.abs(npoints_rbv - npoints_exp) < 1, + f"Expected to record {npoints_exp} points but got {npoints_rbv}", + ) self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") - self.assertTrue(pso.dstArrayDepleted.value, - "Expected to cover all pints in the distance array") + self.assertTrue( + pso.dstArrayDepleted.value, "Expected to cover all pints in the distance array" + ) print("AX:PSO: Done testing PSO module in distance mode!") pso.unstage() ddc.unstage() - def test_10_AxisPsoWindow1(self): - """ Test basic PSO window mode functionality - - This module tests basic PSO distance functionality, like events and - waveform generation. The goal is to ensure the basic operation of - the PSO module and the polled feedback. - """ - # Throws if IOC is not available or PVs are incorrect - pso = aa1AxisPsoWindow(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") - mot.wait_for_connection() - pso.wait_for_connection() - ddc.wait_for_connection() - pso.unstage() - ddc.unstage() - mot.unstage() - print("\nAX:PSO: Checking axis PSO in window mode (basic)") - - print("AX:PSO: Directly checking basic window output (1/3)") - # Configure steps and move to middle of range - pso.configure({'bounds': (5, 13), 'wmode': "output"}) - pso.kickoff() - sleep(1) - # Move half a step - mot.move(mot.position + 0.1) - dist = 0.1 - - # Start moving in steps, while checking output - for ii in range(42): - mot.move(mot.position + 0.2) - dist += 0.2 - sleep(1) - if 5 < dist < 13: - self.assertTrue(pso.output.value == 1, - f"Expected HIGH PSO output inside the window at distance {dist}") - else: - self.assertTrue(pso.output.value == 0, - f"Expected LOW PSO output outside the window at distance {dist}") - - print("AX:PSO: The next tests fail due to encoder jitter!") - pso.unstage() - return - - print("AX:PSO: Checking basic window event generation with DDC (2/3)") - # Move to acceleration position - acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value - print(f"Acceleration distance: {acc_dist}") - start_position = mot.position - mot.move(start_position - acc_dist) - # Configure PSO in window mode - cfg = {'bounds': (acc_dist, 30 + acc_dist), 'wmode': "pulsed", "emode": 'Both'} - dev.configure(cfg) - # Configure the data capture (must be performed in start position) - cfg = {'ntotal': 5*10, 'trigger': DriveDataCaptureTrigger.PsoOutput} - ddc.configure(cfg) - ddc.stage() - npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - # Repeated line scan - for ii in range(10): - dev.stage() - mot.move(start_position + 30 + acc_dist) - dev.unstage() - mot.move(start_position - acc_dist) - sleep(0.5) - npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv - 50) < 1, - f"Expected to record 10 points but got {npoints_rbv}") - - print("AX:PSO: Checking basic 'toggled' event generation with DDC (3/3)") - print("Test removed as there's some heavy jitter on the PSO output flag") - # Move to acceleration position - acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value - print(f"Acceleration distance: {acc_dist}") - mot.move(mot.position - acc_dist) - # Configure PSO in window mode - cfg = {'bounds': (acc_dist, 30 + acc_dist + 1), 'wmode': "output"} - dev.configure(cfg) - dev.stage() - # Configure the data capture - cfg = {'ntotal': 10, 'trigger': DriveDataCaptureTrigger.PsoOutput} - ddc.configure(cfg) - ddc.stage() - npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 0, - f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - # Snake scan - for ii in range(10): - if ii % 2 == 0: - mot.move(mot.position + 30 + 2 * acc_dist + 0.1) - else: - mot.move(mot.position - 30 - 2 * acc_dist - 0.1) - sleep(0.5) - npoints_rbv = ddc.nsamples_rbv.value - self.assertTrue(np.abs(npoints_rbv - 10) < 1, - f"Expected to record 10 points but got {npoints_rbv}") - - def test_12_StepScan(self): - """ Test a simple blusky style step scan (via BEC) + def test_10_StepScan(self): + """Test a simple blusky style step scan (via BEC) This module tests manual PSO trigger functionality. The goal is to ensure the basic operation of the PSO module and the polled feedback. """ # Throws if IOC is not available or PVs are incorrect - pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_IOC_AXIS, name="x") mot.wait_for_connection() pso.wait_for_connection() ddc.wait_for_connection() - pso.unstage() - ddc.unstage() + pso.disarm() + ddc.disarm() mot.unstage() - t_start = time() - print("\ntest_12_StepScan: Simple step scan (via Ophyd)") + t_start = time.time() + print("\ntest_10_StepScan: Simple step scan (via Ophyd)") - PosStart = 112.0 - ScanRange = 180 - ScanSteps = 18 + pos_start = 112.0 + scan_range = 180 + scan_steps = 18 - step_size = ScanRange / ScanSteps + step_size = scan_range / scan_steps positions = [] - for ii in range(ScanSteps+1): - positions.append(PosStart + ii * step_size) + for ii in range(scan_steps + 1): + positions.append(pos_start + ii * step_size) # Motor setup - mot.move(PosStart) - self.assertTrue(np.abs(mot.position-PosStart) < 0.1, - f"Expected to move to scan start position {PosStart}, now at {mot.position}") - # PSO setup - pso.configure(d={'n_pulse': 2}) + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 1.0, + f"Expected to move to scan start position {pos_start}, now at {mot.position}", + ) + # PSO setup (very large pulse distance) + pso.configure(d={"pso_n_pulse": 2, "pso_distance": 9999}) # DDC setup (trigger at each point plus some extra) - cfg = {'ntotal': 10 * (ScanSteps + 1), 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": 1000 * (scan_steps + 1), "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) print("\tStage") mot.stage() - pso.stage() - ddc.stage() + pso.arm() + ddc.arm() for ii in range(len(positions)): mot.move(positions[ii]) - self.assertTrue(np.abs(mot.position-positions[ii]) < 1.0, - f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}") + self.assertTrue( + np.abs(mot.position - positions[ii]) < AXIS_TOL, + f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}", + ) pso.trigger() + time.sleep(1) print("\tUnstage") mot.unstage() - ddc.unstage() - pso.unstage() + ddc.disarm() + pso.disarm() - t_end = time() + t_end = time.time() t_elapsed = t_end - t_start print(f"Elapsed scan time: {t_elapsed}") print("Evaluate final state") - sleep(0.5) + time.sleep(0.5) npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 2 * len(positions), - f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + 2 * len(positions), + f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}", + ) - def test_13_TomcatSequenceScan(self): - """ Test the zig-zag sequence scan from Tomcat (via BEC) + def test_11_TomcatSequenceScan(self): + """Test the zig-zag sequence scan from Tomcat (via BEC) - This module tests basic PSO distance functionality, like events and - waveform generation. The goal is to ensure the basic operation of - the PSO module and the polled feedback. + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. """ # Throws if IOC is not available or PVs are incorrect - pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") - ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":DDC:", name="ddc") + mot = AerotechMotor(AA1_IOC_NAME + ":" + AA1_IOC_AXIS, name="x") mot.wait_for_connection() pso.wait_for_connection() ddc.wait_for_connection() - pso.unstage() - ddc.unstage() + pso.disarm() + ddc.disarm() mot.unstage() - t_start = time() - print("\ntest_13_TomcatSequenceScan: TOMCAT Sequeence scan (via Ophyd)") + t_start = time.time() + print("\ntest_11_TomcatSequenceScan: TOMCAT Sequeence scan (via Ophyd)") - ScanStart = 42.0 - ScanRange = 180 - NumRepeat = 10 - ScanType = "PosNeg" + scan_start = 42.0 + scan_range = 180 + num_repeat = 10 + scan_type = "PosNeg" # Dynamic properties - VelScan = 90 - AccScan = 500 - SafeDist = 10.0 + scan_vel = AXIS_VELOCITY + scan_acc = 500 + dist_safe = 10.0 ###################################################################### print("\tConfigure") # Scan range - AccDist = 0.5 * VelScan * VelScan / AccScan + SafeDist - if ScanType in ["PosNeg", "Pos"]: - PosStart = ScanStart - AccDist - PosEnd = ScanStart + ScanRange + AccDist - elif ScanType in ["NegPos", "Neg"]: - PosStart = ScanStart + AccDist - PosEnd = ScanStart - ScanRange - AccDist + dist_acc = 0.5 * AXIS_VELOCITY * AXIS_VELOCITY / scan_acc + dist_safe + if scan_type in ["PosNeg", "Pos"]: + pos_start = scan_start - dist_acc + pos_end = scan_start + scan_range + dist_acc + elif scan_type in ["NegPos", "Neg"]: + pos_start = scan_start + dist_acc + pos_end = scan_start - scan_range - dist_acc else: - raise RuntimeError(f"Unexpected ScanType: {ScanType}") + raise RuntimeError(f"Unexpected ScanType: {scan_type}") # Motor setup - mot.velocity.set(VelScan).wait() - mot.acceleration.set(VelScan/AccScan).wait() - mot.move(PosStart) - self.assertTrue(np.abs(mot.position-PosStart) < 0.1, - f"Expected to move to scan start position {PosStart}, now at {mot.position}") + mot.velocity.set(AXIS_VELOCITY).wait() + mot.acceleration.set(AXIS_VELOCITY / scan_acc).wait() + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < 1.0, + f"Expected to move to scan start position {pos_start}, now at {mot.position}", + ) # PSO setup - print(f"Configuring PSO to: {[AccDist, ScanRange]}") - cfg = {'distance': [AccDist, ScanRange], 'wmode': "toggle"} + print(f"Configuring PSO to: {[dist_acc, scan_range]}") + cfg = {"pso_distance": [dist_acc, scan_range], "pso_wavemode": "toggle"} pso.configure(cfg) # DDC setup (Tomcat runs in ENABLE mode, so only one posedge) - cfg = {'ntotal': NumRepeat, 'trigger': DriveDataCaptureTrigger.PsoOutput} + cfg = {"num_points_total": num_repeat, "ddc_trigger": DriveDataCaptureTrigger.PsoOutput} ddc.configure(cfg) print("\tStage") mot.stage() - pso.stage() - ddc.stage() - print("\tKickoff") - pso.kickoff() + pso.arm() + ddc.arm() - for ii in range(NumRepeat): + for ii in range(num_repeat): # No option to reset the index counter... pso.dstArrayRearm.set(1).wait() - if ScanType in ["Pos", "Neg"]: - mot.move(PosEnd) - self.assertTrue(np.abs(mot.position-PosEnd) < 1.0, - f"Expected to reach {PosEnd}, now is at {mot.position} on iteration {ii}") - mot.move(PosStart) - self.assertTrue(np.abs(mot.position-PosStart) < 1.0, - f"Expected to rewind to {PosStart}, now at {mot.position} on iteration {ii}") + if scan_type in ["Pos", "Neg"]: + mot.move(pos_end) + self.assertTrue( + np.abs(mot.position - pos_end) < AXIS_TOL, + f"Expected to reach {pos_end}, now is at {mot.position} on iteration {ii}", + ) + mot.move(pos_start) + self.assertTrue( + np.abs(mot.position - pos_start) < AXIS_TOL, + f"Expected to rewind to {pos_start}, now at {mot.position} on iteration {ii}", + ) - if ScanType in ["PosNeg", "NegPos"]: + if scan_type in ["PosNeg", "NegPos"]: if ii % 2 == 0: - mot.move(PosEnd) - self.assertTrue(np.abs(mot.position-PosEnd) < 1.0, - f"Expected to reach {PosEnd}, now at {mot.position} on iteration {ii}") + mot.move(pos_end, wait=True) + self.assertTrue( + np.abs(mot.position - pos_end) < AXIS_TOL, + f"Expected to reach {pos_end}, now at {mot.position} on iteration {ii}", + ) else: - mot.move(PosStart) - self.assertAlmostEqual(mot.position, PosStart, 1, - f"Expected to reach {PosStart}, now is at {mot.position} on iteration {ii}") + mot.move(pos_start, wait=True) + self.assertTrue( + np.abs(mot.position - pos_start) < AXIS_TOL, + f"Expected to reach {pos_start}, now at {mot.position} on iteration {ii}", + ) - sleep(0.2) + time.sleep(0.2) npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, ii+1, - f"Expected to record {ii+1} DDC points, got: {npoints_rbv} on iteration {ii}") + self.assertEqual( + npoints_rbv, + ii + 1, + f"Expected to record {ii+1} DDC points, got: {npoints_rbv} on iteration {ii}", + ) print("\tUnstage") mot.unstage() - ddc.unstage() - pso.unstage() + ddc.disarm() + pso.disarm() - t_end = time() + t_end = time.time() t_elapsed = t_end - t_start print(f"Elapsed scan time: {t_elapsed}") print("Evaluate final state") - sleep(3) + time.sleep(3) npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, NumRepeat, - f"Expected to record {NumRepeat} DDC points, but got: {npoints_rbv}") + self.assertEqual( + npoints_rbv, + num_repeat, + f"Expected to record {num_repeat} DDC points, but got: {npoints_rbv}", + ) - - def test_14_JinjaTomcatSequenceScan(self): + def test_12_JinjaTomcatSequenceScan(self): + """ Run a test sequence scan with template substitution""" # Load the test file - try: - filename = "test/testSequenceScanTemplate.ascript" - with open(filename) as f: - templatetext = f.read() - except FileNotFoundError: - filename = "testSequenceScanTemplate.ascript" - with open(filename) as f: - templatetext = f.read() + path = os.path.dirname(os.path.realpath(__file__)) + filename = f"{path}/testSequenceScanTemplate.ascript" + with open(filename, encoding="utf-8") as f: + templatetext = f.read() - import jinja2 - scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 13, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500} + scanconfig = { + "rotaxis": "RY", + "startpos": 42, + "scanrange": 180, + "nrepeat": 13, + "scandir": "NEGPOS", + "npoints": 1300, + "scanvel": AXIS_VELOCITY, + "scanacc": 500, + "accdist": 0.5*AXIS_VELOCITY*AXIS_VELOCITY/500, + "psotrigger": "PsoEvent", + "psoBoundsPos": [0.5*AXIS_VELOCITY*AXIS_VELOCITY/500, 180], + "psoBoundsNeg": [0.5*AXIS_VELOCITY*AXIS_VELOCITY/500, 180] + } tm = jinja2.Template(templatetext) text = tm.render(scan=scanconfig) - # with open("test/tcatSequenceScan.ascript", "w") as text_file: - # text_file.write(text) + with open("./tcatSequenceScanRBV.ascript", "w") as text_file: + text_file.write(text) # Throws if IOC is not available or PVs are incorrect - dev = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") - dev.wait_for_connection() - print("\nTOMCAT Sequeence scan (via Jinjad AeroScript)") + tsk = aa1Tasks(AA1_IOC_NAME + ":TASK:", name="tsk") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_IOC_AXIS + ":DDC:", name="ddc") + tsk.wait_for_connection() + ddc.wait_for_connection() + print("\ntest_12_JinjaTomcatSequenceScan: TOMCAT Sequeence scan (via Jinjad AeroScript)") # Copy file to controller and run it - t_start = time() - dev.launch_script("tcatSequenceScan.ascript", taskindex=4, filetext=text) - sleep(0.5) - dev.complete().wait() - t_end = time() + t_start = time.time() + tsk.configure({'script_text': text, 'script_task': 4, 'script_file': "sequence.ascript"}) + ddc.configure({"num_points_total": 13*100}) + tsk.arm() + ddc.arm() + tsk.launch() + time.sleep(1.0) + tsk.complete().wait() + t_end = time.time() t_elapsed = t_end - t_start print(f"Elapsed scan time: {t_elapsed}") - sleep(0.5) - ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") - ddc.wait_for_connection() + time.sleep(1.0) npoints_rbv = ddc.nsamples_rbv.value - self.assertEqual(npoints_rbv, 13, - f"Expected to record 13 DDC points, got: {npoints_rbv} after script") - - + self.assertEqual( + npoints_rbv, 26, f"Expected to record 13 DDC points, got: {npoints_rbv} after script" + ) if __name__ == "__main__": diff --git a/tomcat_bec/devices/aerotech/test/testSequenceScanTemplate.ascript b/tomcat_bec/devices/aerotech/test/testSequenceScanTemplate.ascript index c34b9ee..a7aaf9b 100644 --- a/tomcat_bec/devices/aerotech/test/testSequenceScanTemplate.ascript +++ b/tomcat_bec/devices/aerotech/test/testSequenceScanTemplate.ascript @@ -1,13 +1,18 @@ -// Test program for zig-zag line scanning with PSO window output "enable" -// signal and DDC synchronized to external trigger input. +// Test program for simple zig-zag line scanning with PSO window output +// "enable" signal and DDC synchronized to external trigger input. // The file expects external parameter validation // The PSO locations arrays are set externally from EPICS PV +// +#define DDC_ADDR 0x800000 +#define PSO_ADDR 0x0 + + enum ScanType - Pos = 0 - Neg = 1 - PosNeg = 2 - NegPos = 3 + POS = 0 + NEG = 1 + POSNEG = 2 + NEGPOS = 3 end @@ -15,74 +20,98 @@ program ////////////////////////////////////////////////////////////////////////// // External parameters - USE THEESE var $fStartPosition as real = {{ scan.startpos }} - var $fScanRange as real = $fStartPosition + {{ scan.scanrange }} + var $fScanRange as real = {{ scan.scanrange }} var $iNumRepeat as integer = {{ scan.nrepeat }} - var $eScanType as ScanType = ScanType.{{ scan.scandir or 'Pos' }} + var $eScanType as ScanType = ScanType.{{ scan.scandir or 'POS' }} + var $iNumDdcRead as integer = {{ scan.npoints }} var $fVelJog as real = {{ scan.jogvel or 200 }} var $fVelScan as real = {{ scan.scanvel }} - var $fAcceleration = {{ scan.scanacc or 500 }} - var $fSafeDist = 10.0 - + var $fAcceleration = {{ scan.scanacc }} + var $fAccDistance as real = {{ scan.accdist }} + var $eDdcTrigger as DriveDataCaptureTrigger = DriveDataCaptureTrigger.{{ scan.psotrigger }} + ////////////////////////////////////////////////////////////////////////// // Internal parameters - dont use - var $axis as axis = ROTY - var $ii as integer + var $axis as axis = {{ scan.rotaxis or 'ROTY' }} + var $ii as integer + var $axisFaults as integer = 0 var $iDdcSafeSpace as integer = 4096 // Set acceleration SetupAxisRampType($axis, RampType.Linear) - SetupAxisRampValue($axis,0,$fAcceleration) - var $fAccDistance as real = 0.5 * $fVelScan * $fVelScan / $fAcceleration + $fSafeDist + SetupAxisRampValue($axis, 0, $fAcceleration) // set the actual scan range var $fPosStart as real var $fPosEnd as real - if $eScanType == ScanType.Pos + if $eScanType == ScanType.POS $fPosStart = $fStartPosition - $fAccDistance $fPosEnd = $fStartPosition + $fScanRange + $fAccDistance - elseif $eScanType == ScanType.Neg + elseif $eScanType == ScanType.NEG $fPosStart = $fStartPosition + $fAccDistance $fPosEnd = $fStartPosition - $fScanRange - $fAccDistance - elseif $eScanType == ScanType.PosNeg + elseif $eScanType == ScanType.POSNEG $fPosStart = $fStartPosition - $fAccDistance $fPosEnd = $fStartPosition + $fScanRange + $fAccDistance - elseif $eScanType == ScanType.NegPos + elseif $eScanType == ScanType.NEGPOS $fPosStart = $fStartPosition + $fAccDistance $fPosEnd = $fStartPosition - $fScanRange - $fAccDistance end // Move to start position before the scan + // NOTE: Also wait for GigaFrost to start, otherwise early triggers might be missed MoveAbsolute($axis, $fPosStart, $fVelJog) WaitForInPosition($axis) - + Dwell(2) + + // Set globals for feedback + $rglobal[2] = $fPosStart + $rglobal[3] = $fPosEnd + // Configure PSO + // FIXME : When the controller is restarted + PsoDistanceConfigureInputs($axis, [PsoDistanceInput.XC4PrimaryFeedback]) PsoDistanceCounterOff($axis) PsoDistanceEventsOff($axis) PsoWindowConfigureEvents($axis, PsoWindowEventMode.None) PsoWaveformOff($axis) - PsoOutputConfigureSource($axis, PsoOutputSource.Waveform) - var $iPsoArrayAddr as integer = 0 - var $iPsoArray[] as real = [UnitsToCounts($axis, $fAccDistance), UnitsToCounts($axis, $fScanRange)] - DriveArrayWrite($axis, $iPsoArray, $iPsoArrayAddr, length($iPsoArray), DriveArrayType.PsoDistanceEventDistances) - PsoDistanceConfigureArrayDistances($axis, $iPsoArrayAddr, 2, 0) + // Simple PSO trigger pattern + var $iPsoArrayPosAddr as integer = PSO_ADDR + var $iPsoArrayPos[] as real = [{% for psoDist in scan.psoBoundsPos[:-1] %} UnitsToCounts($axis, {{ psoDist }}), {% endfor %} UnitsToCounts($axis, {{ scan.psoBoundsPos[-1] }}) ] + DriveArrayWrite($axis, $iPsoArrayPos, $iPsoArrayPosAddr, length($iPsoArrayPos), DriveArrayType.PsoDistanceEventDistances) + + {% if scan.psoBoundsNeg|length > 0 %} + var $iPsoArrayNegAddr as integer = ($iPsoArrayPosAddr + length($iPsoArrayPos)) * 4 + var $iPsoArrayNeg[] as real = [{% for psoDist in scan.psoBoundsNeg[:-1] %} UnitsToCounts($axis, {{ psoDist }}), {% endfor %} UnitsToCounts($axis, {{ scan.psoBoundsNeg[-1] }}) ] + DriveArrayWrite($axis, $iPsoArrayNeg, $iPsoArrayNegAddr, length($iPsoArrayNeg), DriveArrayType.PsoDistanceEventDistances) + {% else %} + var $iPsoArrayNegAddr as integer = ($iPsoArrayPosAddr + length($iPsoArrayPos)) * 4 + var $iPsoArrayNegSize as integer = 0 + var $iPsoArrayNeg[] as real = [] + // DriveArrayWrite($axis, $iPsoArrayNeg, $iPsoArrayNegAddr, length($iPsoArrayNeg), DriveArrayType.PsoDistanceEventDistances) + {% endif %} + + + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, length($iPsoArrayPos), 0) + PsoDistanceCounterOn($axis) PsoDistanceEventsOn($axis) PsoWaveformConfigureMode($axis, PsoWaveformMode.Toggle) - PsoWaveformOn($axis) + PsoWaveformOn($axis) + PsoOutputConfigureSource($axis, PsoOutputSource.Waveform) // Configure Drive Data Collection - var $iDdcArrayAddr as integer = 8388608 - var $iDdcArraySize as integer = 5000 + var $iDdcArraySize as integer = $iNumDdcRead DriveDataCaptureConfigureInput($axis, 0, DriveDataCaptureInput.PrimaryFeedback); DriveDataCaptureConfigureInput($axis, 1, DriveDataCaptureInput.AnalogInput0 ); - DriveDataCaptureConfigureTrigger($axis, 0, DriveDataCaptureTrigger.PsoOutput ); - DriveDataCaptureConfigureTrigger($axis, 1, DriveDataCaptureTrigger.PsoOutput ); + DriveDataCaptureConfigureTrigger($axis, 0, $eDdcTrigger ); + DriveDataCaptureConfigureTrigger($axis, 1, $eDdcTrigger ); - DriveDataCaptureConfigureArray($axis, 0, $iDdcArrayAddr, $iDdcArraySize); - DriveDataCaptureConfigureArray($axis, 1, $iDdcArrayAddr + $iDdcSafeSpace + 8 * $iDdcArraySize, $iDdcArraySize); + DriveDataCaptureConfigureArray($axis, 0, DDC_ADDR, $iDdcArraySize); + DriveDataCaptureConfigureArray($axis, 1, DDC_ADDR + $iDdcSafeSpace + 8 * $iDdcArraySize, $iDdcArraySize); // Directly before scan PsoDistanceCounterOn($axis) @@ -92,41 +121,44 @@ program /////////////////////////////////////////////////////////// // Start the actual scanning /////////////////////////////////////////////////////////// - for $ii = 0 to ($iNumRepeat-1) - PsoDistanceConfigureArrayDistances($axis, $iPsoArrayAddr, 2, 0) - if $eScanType == ScanType.Pos - MoveAbsolute($axis, $fPosEnd, $fVelScan) - WaitForInPosition($axis) - MoveAbsolute($axis, $fPosStart, $fVelScan) - WaitForInPosition($axis) - elseif $eScanType == ScanType.Neg - MoveAbsolute($axis, $fPosEnd, $fVelScan) - WaitForInPosition($axis) - MoveAbsolute($axis, $fPosStart, $fVelScan) - WaitForInPosition($axis) - elseif $eScanType == ScanType.PosNeg - if ($ii % 2) == 0 - MoveAbsolute($axis, $fPosEnd, $fVelScan) - WaitForInPosition($axis) - elseif ($ii % 2) == 1 - MoveAbsolute($axis, $fPosStart, $fVelScan) - WaitForInPosition($axis) - end - elseif $eScanType == ScanType.NegPos - if ($ii % 2) == 0 - MoveAbsolute($axis, $fPosEnd, $fVelScan) - WaitForInPosition($axis) - elseif ($ii % 2) == 1 - MoveAbsolute($axis, $fPosStart, $fVelScan) - WaitForInPosition($axis) - end + if $eScanType == ScanType.POS || $eScanType == ScanType.NEG + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, length($iPsoArrayPos), 0) + MoveAbsolute($axis, $fPosEnd, $fVelScan) + WaitForMotionDone($axis) + + $axisFaults = StatusGetAxisItem($axis, AxisDataSignal.AxisFault) + if $axisFaults + TaskSetError(TaskGetIndex(), "AxisFault on axis ROTY") + end + elseif $eScanType == ScanType.POSNEG || $eScanType == ScanType.NEGPOS + for $ii = 0 to ($iNumRepeat-1) + // Feedback on progress + $rglobal[4] = $ii + if ($ii % 2) == 0 + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, length($iPsoArrayPos), 0) + MoveAbsolute($axis, $fPosEnd, $fVelScan) + elseif ($ii % 2) == 1 + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayNegAddr, length($iPsoArrayNeg), 0) + MoveAbsolute($axis, $fPosStart, $fVelScan) + end + WaitForMotionDone($axis) + + $axisFaults = StatusGetAxisItem($axis, AxisDataSignal.AxisFault) + if $axisFaults + TaskSetError(TaskGetIndex(), "AxisFault on axis ROTY") + end + Dwell(0.2) end - Dwell(0.2) end // Directly after scan PsoDistanceCounterOff($axis) DriveDataCaptureOff($axis, 0) DriveDataCaptureOff($axis, 1) + // move back to start position + MoveAbsolute($axis, $fPosStart, $fVelJog) + WaitForInPosition($axis) + Dwell(2) end + diff --git a/tomcat_bec/tcatSequenceScanRBV.ascript b/tomcat_bec/tcatSequenceScanRBV.ascript new file mode 100644 index 0000000..363b416 --- /dev/null +++ b/tomcat_bec/tcatSequenceScanRBV.ascript @@ -0,0 +1,158 @@ +// Test program for simple zig-zag line scanning with PSO window output +// "enable" signal and DDC synchronized to external trigger input. +// The file expects external parameter validation +// The PSO locations arrays are set externally from EPICS PV +// +#define DDC_ADDR 0x800000 +#define PSO_ADDR 0x0 + + + +enum ScanType + POS = 0 + NEG = 1 + POSNEG = 2 + NEGPOS = 3 +end + + +program + ////////////////////////////////////////////////////////////////////////// + // External parameters - USE THEESE + var $fStartPosition as real = 42 + var $fScanRange as real = 180 + var $iNumRepeat as integer = 13 + var $eScanType as ScanType = ScanType.NEGPOS + var $iNumDdcRead as integer = 1300 + + var $fVelJog as real = 200 + var $fVelScan as real = 150 + var $fAcceleration = 500 + var $fAccDistance as real = 22.5 + var $eDdcTrigger as DriveDataCaptureTrigger = DriveDataCaptureTrigger.PsoEvent + + ////////////////////////////////////////////////////////////////////////// + // Internal parameters - dont use + var $axis as axis = ROTY + var $ii as integer + var $axisFaults as integer = 0 + var $iDdcSafeSpace as integer = 4096 + + // Set acceleration + SetupAxisRampType($axis, RampType.Linear) + SetupAxisRampValue($axis, 0, $fAcceleration) + + // set the actual scan range + var $fPosStart as real + var $fPosEnd as real + if $eScanType == ScanType.POS + $fPosStart = $fStartPosition - $fAccDistance + $fPosEnd = $fStartPosition + $fScanRange + $fAccDistance + elseif $eScanType == ScanType.NEG + $fPosStart = $fStartPosition + $fAccDistance + $fPosEnd = $fStartPosition - $fScanRange - $fAccDistance + elseif $eScanType == ScanType.POSNEG + $fPosStart = $fStartPosition - $fAccDistance + $fPosEnd = $fStartPosition + $fScanRange + $fAccDistance + elseif $eScanType == ScanType.NEGPOS + $fPosStart = $fStartPosition + $fAccDistance + $fPosEnd = $fStartPosition - $fScanRange - $fAccDistance + end + + // Move to start position before the scan + // NOTE: Also wait for GigaFrost to start, otherwise early triggers might be missed + MoveAbsolute($axis, $fPosStart, $fVelJog) + WaitForInPosition($axis) + Dwell(2) + + // Set globals for feedback + $rglobal[2] = $fPosStart + $rglobal[3] = $fPosEnd + + // Configure PSO + // FIXME : When the controller is restarted + PsoDistanceConfigureInputs($axis, [PsoDistanceInput.XC4PrimaryFeedback]) + PsoDistanceCounterOff($axis) + PsoDistanceEventsOff($axis) + PsoWindowConfigureEvents($axis, PsoWindowEventMode.None) + PsoWaveformOff($axis) + // Simple PSO trigger pattern + var $iPsoArrayPosAddr as integer = PSO_ADDR + var $iPsoArrayPos[] as real = [ UnitsToCounts($axis, 22.5), UnitsToCounts($axis, 180) ] + DriveArrayWrite($axis, $iPsoArrayPos, $iPsoArrayPosAddr, length($iPsoArrayPos), DriveArrayType.PsoDistanceEventDistances) + + + var $iPsoArrayNegAddr as integer = ($iPsoArrayPosAddr + length($iPsoArrayPos)) * 4 + var $iPsoArrayNeg[] as real = [ UnitsToCounts($axis, 22.5), UnitsToCounts($axis, 180) ] + DriveArrayWrite($axis, $iPsoArrayNeg, $iPsoArrayNegAddr, length($iPsoArrayNeg), DriveArrayType.PsoDistanceEventDistances) + + + + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, length($iPsoArrayPos), 0) + PsoDistanceCounterOn($axis) + PsoDistanceEventsOn($axis) + + PsoWaveformConfigureMode($axis, PsoWaveformMode.Toggle) + PsoWaveformOn($axis) + PsoOutputConfigureSource($axis, PsoOutputSource.Waveform) + + // Configure Drive Data Collection + var $iDdcArraySize as integer = $iNumDdcRead + + DriveDataCaptureConfigureInput($axis, 0, DriveDataCaptureInput.PrimaryFeedback); + DriveDataCaptureConfigureInput($axis, 1, DriveDataCaptureInput.AnalogInput0 ); + + DriveDataCaptureConfigureTrigger($axis, 0, $eDdcTrigger ); + DriveDataCaptureConfigureTrigger($axis, 1, $eDdcTrigger ); + + DriveDataCaptureConfigureArray($axis, 0, DDC_ADDR, $iDdcArraySize); + DriveDataCaptureConfigureArray($axis, 1, DDC_ADDR + $iDdcSafeSpace + 8 * $iDdcArraySize, $iDdcArraySize); + + // Directly before scan + PsoDistanceCounterOn($axis) + DriveDataCaptureOn($axis, 0) + DriveDataCaptureOn($axis, 1) + + /////////////////////////////////////////////////////////// + // Start the actual scanning + /////////////////////////////////////////////////////////// + + if $eScanType == ScanType.POS || $eScanType == ScanType.NEG + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, length($iPsoArrayPos), 0) + MoveAbsolute($axis, $fPosEnd, $fVelScan) + WaitForMotionDone($axis) + + $axisFaults = StatusGetAxisItem($axis, AxisDataSignal.AxisFault) + if $axisFaults + TaskSetError(TaskGetIndex(), "AxisFault on axis ROTY") + end + elseif $eScanType == ScanType.POSNEG || $eScanType == ScanType.NEGPOS + for $ii = 0 to ($iNumRepeat-1) + // Feedback on progress + $rglobal[4] = $ii + if ($ii % 2) == 0 + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, length($iPsoArrayPos), 0) + MoveAbsolute($axis, $fPosEnd, $fVelScan) + elseif ($ii % 2) == 1 + PsoDistanceConfigureArrayDistances($axis, $iPsoArrayNegAddr, length($iPsoArrayNeg), 0) + MoveAbsolute($axis, $fPosStart, $fVelScan) + end + WaitForMotionDone($axis) + + $axisFaults = StatusGetAxisItem($axis, AxisDataSignal.AxisFault) + if $axisFaults + TaskSetError(TaskGetIndex(), "AxisFault on axis ROTY") + end + Dwell(0.2) + end + end + + // Directly after scan + PsoDistanceCounterOff($axis) + DriveDataCaptureOff($axis, 0) + DriveDataCaptureOff($axis, 1) + // move back to start position + MoveAbsolute($axis, $fPosStart, $fVelJog) + WaitForInPosition($axis) + Dwell(2) +end