diff --git a/tomcat_bec/devices/aerotech/AerotechAutomation1.py b/tomcat_bec/devices/aerotech/AerotechAutomation1.py index d06727d..1588beb 100644 --- a/tomcat_bec/devices/aerotech/AerotechAutomation1.py +++ b/tomcat_bec/devices/aerotech/AerotechAutomation1.py @@ -250,7 +250,7 @@ class aa1Tasks(Device): # FIXME: BEC will swallow this exception # error = bool(value[task_idx] in ["Error", 8]) timestamp_ = timestamp - print(result) + # print(result) return result # Subscribe and wait for update @@ -258,41 +258,6 @@ class aa1Tasks(Device): return status -class aa1TaskState(Device): - """Task state monitoring API - - This is the task state monitoring interface for Automation1 tasks. It - does not launch execution, but can wait for the execution to complete. - """ - - index = Component(EpicsSignalRO, "INDEX", kind=Kind.config) - status = Component(EpicsSignalRO, "STATUS", auto_monitor=True, kind=Kind.hinted) - errorCode = Component(EpicsSignalRO, "ERROR", auto_monitor=True, kind=Kind.hinted) - warnCode = Component(EpicsSignalRO, "WARNING", auto_monitor=True, kind=Kind.hinted) - - def complete(self) -> DeviceStatus: - """ Wait for the task while RUNNING""" - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - - def not_running(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_ == 0) else (value not in ["Running", 4]) - timestamp_ = timestamp - print(result) - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.status, not_running, settle_time=0.5) - return status - - def kickoff(self) -> DeviceStatus: - """ Standard Bluesky kickoff method""" - status = DeviceStatus(self) - status.set_finished() - return status - - class aa1GlobalVariables(Device): """Global variables @@ -309,9 +274,9 @@ class aa1GlobalVariables(Device): ''' var = aa1GlobalVariables(AA1_IOC_NAME+":VAR:", name="var") var.wait_for_connection() - ret = var.readInt(42) - var.writeFloat(1000, np.random.random(1024)) - ret_arr = var.readFloat(1000, 1024) + ret = var.read_int(42) + var.write_float(1000, np.random.random(1024)) + ret_arr = var.read_float(1000, 1024) ''' """ USER_ACCESS = ['read_int', 'write_int', 'read_float', 'write_float', 'read_string', 'write_string'] @@ -604,9 +569,61 @@ class aa1AxisPsoBase(Device): """ Toggle waveform outup""" orig_wave_mode = self.waveMode.get() self.waveMode.set("Toggle").wait() - self.fire(0.1) + self.trigger(0.1) self.waveMode.set(orig_wave_mode).wait() + def configure(self, d: dict): + """ Configure the emitted waveform""" + wmode = d.get("wmode", "pulsed") + t_pulse = d.get("t_pulse", 100) + w_pulse = d.get("w_pulse", 200) + n_pulse = d.get("n_pulse", 1) + + # Configure the pulsed/toggled waveform + if wmode in ["toggle", "toggled"]: + # Switching to simple toggle mode + self.waveEnable.set("On").wait() + self.waveMode.set("Toggle").wait() + elif wmode in ["pulse", "pulsed"]: + # Switching to pulsed mode + self.waveEnable.set("On").wait() + self.waveMode.set("Pulse").wait() + # Setting pulse shape + self.pulseWindow.set(w_pulse).wait() + self.pulseOnTime.set(t_pulse).wait() + self.pulseCount.set(n_pulse).wait() + # Commiting configuration + self.pulseApply.set(1).wait() + # Enabling PSO waveform outputs + self.waveEnable.set("On").wait() + elif wmode in ["output", "flag"]: + self.waveEnable.set("Off").wait() + else: + raise RuntimeError(f"Unsupported window mode: {wmode}") + + # Set PSO output data source + # FIXME : This is essentially staging... + if wmode in ["toggle", "toggled", "pulse", "pulsed"]: + self.outSource.set("Waveform").wait() + elif wmode in ["output", "flag"]: + self.outSource.set("Window").wait() + + def unstage(self, settle_time=0.2): + """ Standard bluesky unstage""" + # Ensure output is set to low + if self.output.value: + self.toggle() + # Turn off window mode + self.winOutput.set("Off").wait() + self.winEvents.set("Off").wait() + # Turn off distance mode + self.dstEventsEna.set("Off").wait() + self.dstCounterEna.set("Off").wait() + # Sleep if specified + if settle_time is not None: + sleep(settle_time) + return super().unstage() + class aa1AxisPsoDistance(aa1AxisPsoBase): """Position Sensitive Output - Distance mode @@ -640,7 +657,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): pso.configure(d={'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 5}) pso.kickoff().wait() """ - _distance_value = 3.141592 + _distance_value = None # ######################################################################## # PSO high level interface @@ -649,16 +666,14 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): functionality for distance mode PSO. :param distance: The trigger distance or the array of distances between subsequent points. - :param wmode: Waveform mode configuration, usually pulsed/toggled. + :param wmode: Waveform mode configuration, usually pulsed/toggled (default: pulsed). + :param t_pulse : trigger high duration in pulsed mode (default: 200 us) """ - distance = d["distance"] - wmode = str(d["wmode"]) - t_pulse = d.get("t_pulse", 100) - w_pulse = d.get("w_pulse", 200) - n_pulse = d.get("n_pulse", 1) + distance = d.get("distance", None) + wmode = d.get("wmode", "pulsed") # Validate input parameters - if wmode not in ["pulse", "pulsed", "toggle", "toggled"]: + if wmode is not None and wmode not in ["pulse", "pulsed", "toggle", "toggled"]: raise RuntimeError(f"Unsupported distace triggering mode: {wmode}") old = self.read_configuration() @@ -669,41 +684,14 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): # Configure distance generator (also resets counter to 0) self._distance_value = distance - if isinstance(distance, (float, int)): - self.dstDistance.set(distance).wait() - elif isinstance(distance, (np.ndarray, list, tuple)): - self.dstDistanceArr.set(distance).wait() + if distance is not None: + if isinstance(distance, (float, int)): + self.dstDistance.set(distance).wait() + elif isinstance(distance, (np.ndarray, list, tuple)): + self.dstDistanceArr.set(distance).wait() # Configure the pulsed/toggled waveform - if wmode in ["toggle", "toggled"]: - # Switching to simple toggle mode - self.waveEnable.set("On").wait() - self.waveMode.set("Toggle").wait() - - elif wmode in ["pulse", "pulsed"]: - # Switching to pulsed mode - self.waveEnable.set("On").wait() - self.waveMode.set("Pulse").wait() - # Setting pulse shape - if w_pulse is not None: - self.pulseWindow.set(w_pulse).wait() - if t_pulse is not None: - self.pulseOnTime.set(t_pulse).wait() - if n_pulse is not None: - self.pulseCount.set(n_pulse).wait() - # Commiting configuration - self.pulseApply.set(1).wait() - # Enabling PSO waveform outputs - self.waveEnable.set("On").wait() - else: - raise RuntimeError(f"Unsupported waveform mode: {wmode}") - - # Ensure output is set to low - if self.output.value: - self.toggle() - - # Set PSO output data source - self.outSource.set("Waveform").wait() + super().configure(d) new = self.read_configuration() logger.info(f"[{self.name}] PSO configured to {wmode} mode") @@ -712,24 +700,17 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): # ######################################################################## # Bluesky step scan interface def stage(self): - """ Stage the device""" - # Rearm the configured array - if isinstance(self._distance_value, (np.ndarray, list, tuple)): - self.dstArrayRearm.set(1).wait() - # Turn off counter monitoring - self.dstEventsEna.set("On").wait() - self.dstCounterEna.set("On").wait() + """ Stage the device: just ignore it... """ return super().stage() - def unstage(self): - """ Unstage the device""" - # Turn off counter monitoring - self.dstEventsEna.set("Off").wait() - self.dstCounterEna.set("Off").wait() - return super().unstage() - # ######################################################################## # Bluesky flyer interface + def prepare(self): + """ Rearms the configured array""" + # Rearm the configured array + if isinstance(self._distance_value, (np.ndarray, list, tuple)): + self.dstArrayRearm.set(1).wait() + def kickoff(self) -> DeviceStatus: """ Kick off an asynchronous acquisition""" # Rearm the configured array @@ -803,9 +784,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase): bounds = d["bounds"] wmode = str(d["wmode"]) emode = d.get("emode", "Enter") - t_pulse = d.get("t_pulse", 100) - w_pulse = d.get("w_pulse", 200) - n_pulse = d.get("n_pulse", 1) # Validate input parameters if wmode not in ["pulse", "pulsed", "toggle", "toggled", "output", "flag"]: @@ -815,7 +793,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase): self._mode = wmode self._eventMode = emode - old = self.read_configuration() # Configure the window module @@ -824,7 +801,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase): self.winCounter.set(0).wait() self._winLower.set(bounds[0]).wait() self._winUpper.set(bounds[1]).wait() - elif isinstance(bounds, (np.ndarray, list, tuple)): self.winCounter.set(0).wait() self.winBoundsArr.set(bounds).wait() @@ -833,33 +809,8 @@ class aa1AxisPsoWindow(aa1AxisPsoBase): self.winOutput.set("Off").wait() self.winEvents.set("Off").wait() - # Configure the pulsed/toggled waveform - if wmode in ["toggle", "toggled"]: - # Switching to simple toggle mode - self.waveEnable.set("On").wait() - self.waveMode.set("Toggle").wait() - elif wmode in ["pulse", "pulsed"]: - # Switching to pulsed mode - self.waveEnable.set("On").wait() - self.waveMode.set("Pulse").wait() - # Setting pulse shape - self.pulseWindow.set(w_pulse).wait() - self.pulseOnTime.set(t_pulse).wait() - self.pulseCount.set(n_pulse).wait() - # Commiting configuration - self.pulseApply.set(1).wait() - # Enabling PSO waveform outputs - self.waveEnable.set("On").wait() - elif wmode in ["output", "flag"]: - self.waveEnable.set("Off").wait() - else: - raise RuntimeError(f"Unsupported window mode: {wmode}") - - # Set PSO output data source - if wmode in ["toggle", "toggled", "pulse", "pulsed"]: - self.outSource.set("Waveform").wait() - elif wmode in ["output", "flag"]: - self.outSource.set("Window").wait() + # Configure the pulsed/toggled/output waveform + super().configure(d) new = self.read_configuration() return (old, new) @@ -886,14 +837,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase): if settle_time is not None: sleep(settle_time) - def unstage(self, settle_time=None): - """ Standard bluesky unstage""" - self.winOutput.set("Off").wait() - self.winEvents.set("Off").wait() - if settle_time is not None: - sleep(settle_time) - return super().unstage() - def complete(self) -> DeviceStatus: """Bluesky flyer interface""" # Array mode waits until the buffer is empty diff --git a/tomcat_bec/devices/aerotech/test/autoHilTest.py b/tomcat_bec/devices/aerotech/test/autoHilTest.py index 1b38720..d3b681e 100644 --- a/tomcat_bec/devices/aerotech/test/autoHilTest.py +++ b/tomcat_bec/devices/aerotech/test/autoHilTest.py @@ -367,36 +367,36 @@ class AerotechAutomation1Test(unittest.TestCase): the PSO module and the polled feedback. """ # Throws if IOC is not available or PVs are incorrect - dev = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") mot.wait_for_connection() - dev.wait_for_connection() + pso.wait_for_connection() ddc.wait_for_connection() + pso.unstage() + ddc.unstage() + mot.unstage() print("\nAX:PSO: Checking axis PSO in distance mode (basic)") - if dev.output.value: - dev.toggle() - sleep(1) print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)") # Configure steps and move to middle of range - dev.configure({'distance': 5, 'wmode': "toggle"}) - dev.stage() + pso.configure({'distance': 5, 'wmode': "toggle"}) + pso.kickoff() mot.move(mot.position + 2.5) # Start moving in steps, while checking output for ii in range(10): - last_pso = dev.output.value + last_pso = pso.output.value mot.move(mot.position + 5) sleep(1) - self.assertTrue(dev.output.value != last_pso, - f"Expected to toggle the PSO output at step {ii}, got {dev.output.value}") - dev.unstage() + self.assertTrue(pso.output.value != last_pso, + f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}") + pso.unstage() ddc.unstage() print("AX:PSO: Checking basic 'toggled' event generation with DDC (2/3)") # Configure steps and move to middle of range - dev.configure({'distance': 2, 'wmode': "toggle"}) - dev.stage() + pso.configure({'distance': 2, 'wmode': "toggle"}) + pso.kickoff() # Configure DDC npoints = 720 / 2.0 @@ -406,7 +406,7 @@ class AerotechAutomation1Test(unittest.TestCase): npoints_rbv = ddc.nsamples_rbv.value self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - self.assertTrue(dev.output.value == 0, "Expected to start from LOW state") + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling mot.move(mot.position + 720) @@ -416,13 +416,13 @@ class AerotechAutomation1Test(unittest.TestCase): npoints_rbv = ddc.nsamples_rbv.value self.assertTrue(np.abs(npoints_rbv-720/4) <= 1, f"Expected to record {720/4} points but got {npoints_rbv}") - dev.unstage() + pso.unstage() ddc.unstage() print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/5)") # Configure steps and move to middle of range - dev.configure({'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 3}) - dev.stage() + pso.configure({'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 3}) + pso.kickoff() # Configure DDC npoints = 3 * 720 / 1.8 @@ -432,7 +432,7 @@ class AerotechAutomation1Test(unittest.TestCase): npoints_rbv = ddc.nsamples_rbv.value self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - self.assertTrue(dev.output.value == 0, "Expected to start from LOW state") + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling mot.move(mot.position + 720) @@ -443,7 +443,7 @@ class AerotechAutomation1Test(unittest.TestCase): self.assertTrue(np.abs(npoints_rbv-3*720/1.8) <= 1, f"Expected to record {3*720/1.8} points but got {npoints_rbv}") print("AX:PSO: Done testing basic PSO distance functionality!") - dev.unstage() + pso.unstage() ddc.unstage() def test_09_PsoIntegration01(self): @@ -452,22 +452,23 @@ class AerotechAutomation1Test(unittest.TestCase): and compares them to the expected values. """ # Throws if IOC is not available or PVs are incorrect - dev = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") mot.wait_for_connection() - dev.wait_for_connection() + pso.wait_for_connection() ddc.wait_for_connection() - print("\nAX:PSO: Integration test for vector style PSO triggering (advanced)") + pso.unstage() + ddc.unstage() + mot.unstage() + print("\ntest_09_PsoIntegration01: Integration test for vector style PSO triggering (advanced)") print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/3)") # This is one line of the Tomcat sequence scan! acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value vec_dist = [acc_dist, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18] - dev.configure({'distance': vec_dist, 'wmode': "toggle"}) - if dev.output.value: - dev.toggle() - dev.stage() + pso.configure({'distance': vec_dist, 'wmode': "toggle"}) + pso.kickoff() # Configure DDC npoints_exp = len(vec_dist) / 2 @@ -477,7 +478,7 @@ class AerotechAutomation1Test(unittest.TestCase): npoints_rbv = ddc.nsamples_rbv.value self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - self.assertTrue(dev.output.value == 0, "Expected to start from LOW state") + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling mot.move(mot.position + np.sum(vec_dist) + 10) @@ -487,11 +488,11 @@ class AerotechAutomation1Test(unittest.TestCase): npoints_rbv = ddc.nsamples_rbv.value self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1, f"Expected to record {npoints_exp} points but got {npoints_rbv}") - self.assertTrue(dev.output.value == 0, + self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") - self.assertTrue(dev.dstArrayDepleted.value, + self.assertTrue(pso.dstArrayDepleted.value, "Expected to cover all points in the distance array") - dev.unstage() + pso.unstage() ddc.unstage() print("AX:PSO: Checking 'individual' style trigger signal with DDC (2/5)") @@ -499,8 +500,8 @@ class AerotechAutomation1Test(unittest.TestCase): acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value vec_dist = [acc_dist] vec_dist.extend([1.618]*100) - dev.configure({'distance': vec_dist, 'wmode': "pulsed"}) - dev.stage() + pso.configure({'distance': vec_dist, 'wmode': "pulsed"}) + pso.kickoff() # Configure DDC npoints_exp = len(vec_dist) @@ -510,7 +511,7 @@ class AerotechAutomation1Test(unittest.TestCase): npoints_rbv = ddc.nsamples_rbv.value self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") - self.assertTrue(dev.output.value == 0, "Expected to start from LOW state") + self.assertTrue(pso.output.value == 0, "Expected to start from LOW state") # Start moving and wait for polling (move negative direction) mot.move(mot.position - np.sum(vec_dist) - acc_dist) @@ -520,11 +521,11 @@ class AerotechAutomation1Test(unittest.TestCase): npoints_rbv = ddc.nsamples_rbv.value self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1, f"Expected to record {npoints_exp} points but got {npoints_rbv}") - self.assertTrue(dev.output.value == 0, "Expected to finish in LOW state") - self.assertTrue(dev.dstArrayDepleted.value, + self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state") + self.assertTrue(pso.dstArrayDepleted.value, "Expected to cover all pints in the distance array") print("AX:PSO: Done testing PSO module in distance mode!") - dev.unstage() + pso.unstage() ddc.unstage() def test_10_AxisPsoWindow1(self): @@ -535,20 +536,21 @@ class AerotechAutomation1Test(unittest.TestCase): the PSO module and the polled feedback. """ # Throws if IOC is not available or PVs are incorrect - dev = aa1AxisPsoWindow(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + pso = aa1AxisPsoWindow(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") mot.wait_for_connection() - dev.wait_for_connection() + pso.wait_for_connection() ddc.wait_for_connection() + pso.unstage() + ddc.unstage() + mot.unstage() print("\nAX:PSO: Checking axis PSO in window mode (basic)") - if dev.output.value: - dev.toggle() print("AX:PSO: Directly checking basic window output (1/3)") # Configure steps and move to middle of range - dev.configure({'bounds': (5, 13), 'wmode': "output"}) - dev.stage() + pso.configure({'bounds': (5, 13), 'wmode': "output"}) + pso.kickoff() sleep(1) # Move half a step mot.move(mot.position + 0.1) @@ -560,14 +562,14 @@ class AerotechAutomation1Test(unittest.TestCase): dist += 0.2 sleep(1) if 5 < dist < 13: - self.assertTrue(dev.output.value == 1, + self.assertTrue(pso.output.value == 1, f"Expected HIGH PSO output inside the window at distance {dist}") else: - self.assertTrue(dev.output.value == 0, + self.assertTrue(pso.output.value == 0, f"Expected LOW PSO output outside the window at distance {dist}") print("AX:PSO: The next tests fail due to encoder jitter!") - dev.unstage() + pso.unstage() return print("AX:PSO: Checking basic window event generation with DDC (2/3)") @@ -625,7 +627,71 @@ class AerotechAutomation1Test(unittest.TestCase): self.assertTrue(np.abs(npoints_rbv - 10) < 1, f"Expected to record 10 points but got {npoints_rbv}") - def test_12_TomcatSequenceScan(self): + def test_12_StepScan(self): + """ Test a simple blusky style step scan (via BEC) + + This module tests manual PSO trigger functionality. The goal is to + ensure the basic operation of the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + pso.wait_for_connection() + ddc.wait_for_connection() + pso.unstage() + ddc.unstage() + mot.unstage() + t_start = time() + print("\ntest_12_StepScan: Simple step scan (via Ophyd)") + + PosStart = 112.0 + ScanRange = 180 + ScanSteps = 18 + + step_size = ScanRange / ScanSteps + positions = [] + for ii in range(ScanSteps+1): + positions.append(PosStart + ii * step_size) + + # Motor setup + mot.move(PosStart) + self.assertTrue(np.abs(mot.position-PosStart) < 0.1, + f"Expected to move to scan start position {PosStart}, now at {mot.position}") + # PSO setup + pso.configure(d={'n_pulse': 2}) + # DDC setup (trigger at each point plus some extra) + cfg = {'ntotal': 10 * (ScanSteps + 1), 'trigger': DriveDataCaptureTrigger.PsoOutput} + ddc.configure(cfg) + + print("\tStage") + mot.stage() + pso.stage() + ddc.stage() + + for ii in range(len(positions)): + mot.move(positions[ii]) + self.assertTrue(np.abs(mot.position-positions[ii]) < 1.0, + f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}") + pso.trigger() + + print("\tUnstage") + mot.unstage() + ddc.unstage() + pso.unstage() + + t_end = time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") + + print("Evaluate final state") + sleep(0.5) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 2 * len(positions), + f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}") + + def test_13_TomcatSequenceScan(self): """ Test the zig-zag sequence scan from Tomcat (via BEC) This module tests basic PSO distance functionality, like events and @@ -639,8 +705,11 @@ class AerotechAutomation1Test(unittest.TestCase): mot.wait_for_connection() pso.wait_for_connection() ddc.wait_for_connection() + pso.unstage() + ddc.unstage() + mot.unstage() t_start = time() - print("\nTOMCAT Sequeence scan (via Ophyd)") + print("\ntest_13_TomcatSequenceScan: TOMCAT Sequeence scan (via Ophyd)") ScanStart = 42.0 ScanRange = 180 @@ -685,6 +754,8 @@ class AerotechAutomation1Test(unittest.TestCase): mot.stage() pso.stage() ddc.stage() + print("\tKickoff") + pso.kickoff() for ii in range(NumRepeat): # No option to reset the index counter... @@ -728,6 +799,7 @@ class AerotechAutomation1Test(unittest.TestCase): self.assertEqual(npoints_rbv, NumRepeat, f"Expected to record {NumRepeat} DDC points, but got: {npoints_rbv}") + def test_14_JinjaTomcatSequenceScan(self): # Load the test file try: @@ -740,7 +812,7 @@ class AerotechAutomation1Test(unittest.TestCase): templatetext = f.read() import jinja2 - scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 10, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500} + scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 13, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500} tm = jinja2.Template(templatetext) text = tm.render(scan=scanconfig) # with open("test/tcatSequenceScan.ascript", "w") as text_file: @@ -760,6 +832,15 @@ class AerotechAutomation1Test(unittest.TestCase): t_elapsed = t_end - t_start print(f"Elapsed scan time: {t_elapsed}") + sleep(0.5) + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + ddc.wait_for_connection() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 13, + f"Expected to record 13 DDC points, got: {npoints_rbv} after script") + + + if __name__ == "__main__": unittest.main()