PSO trigger step scan interface

This commit is contained in:
gac-x05la
2024-10-09 15:26:45 +02:00
parent 13a362212e
commit 6bd43909b7
2 changed files with 207 additions and 183 deletions

View File

@@ -250,7 +250,7 @@ class aa1Tasks(Device):
# FIXME: BEC will swallow this exception
# error = bool(value[task_idx] in ["Error", 8])
timestamp_ = timestamp
print(result)
# print(result)
return result
# Subscribe and wait for update
@@ -258,41 +258,6 @@ class aa1Tasks(Device):
return status
class aa1TaskState(Device):
"""Task state monitoring API
This is the task state monitoring interface for Automation1 tasks. It
does not launch execution, but can wait for the execution to complete.
"""
index = Component(EpicsSignalRO, "INDEX", kind=Kind.config)
status = Component(EpicsSignalRO, "STATUS", auto_monitor=True, kind=Kind.hinted)
errorCode = Component(EpicsSignalRO, "ERROR", auto_monitor=True, kind=Kind.hinted)
warnCode = Component(EpicsSignalRO, "WARNING", auto_monitor=True, kind=Kind.hinted)
def complete(self) -> DeviceStatus:
""" Wait for the task while RUNNING"""
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def not_running(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_ == 0) else (value not in ["Running", 4])
timestamp_ = timestamp
print(result)
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.status, not_running, settle_time=0.5)
return status
def kickoff(self) -> DeviceStatus:
""" Standard Bluesky kickoff method"""
status = DeviceStatus(self)
status.set_finished()
return status
class aa1GlobalVariables(Device):
"""Global variables
@@ -309,9 +274,9 @@ class aa1GlobalVariables(Device):
'''
var = aa1GlobalVariables(AA1_IOC_NAME+":VAR:", name="var")
var.wait_for_connection()
ret = var.readInt(42)
var.writeFloat(1000, np.random.random(1024))
ret_arr = var.readFloat(1000, 1024)
ret = var.read_int(42)
var.write_float(1000, np.random.random(1024))
ret_arr = var.read_float(1000, 1024)
'''
"""
USER_ACCESS = ['read_int', 'write_int', 'read_float', 'write_float', 'read_string', 'write_string']
@@ -604,9 +569,61 @@ class aa1AxisPsoBase(Device):
""" Toggle waveform outup"""
orig_wave_mode = self.waveMode.get()
self.waveMode.set("Toggle").wait()
self.fire(0.1)
self.trigger(0.1)
self.waveMode.set(orig_wave_mode).wait()
def configure(self, d: dict):
""" Configure the emitted waveform"""
wmode = d.get("wmode", "pulsed")
t_pulse = d.get("t_pulse", 100)
w_pulse = d.get("w_pulse", 200)
n_pulse = d.get("n_pulse", 1)
# Configure the pulsed/toggled waveform
if wmode in ["toggle", "toggled"]:
# Switching to simple toggle mode
self.waveEnable.set("On").wait()
self.waveMode.set("Toggle").wait()
elif wmode in ["pulse", "pulsed"]:
# Switching to pulsed mode
self.waveEnable.set("On").wait()
self.waveMode.set("Pulse").wait()
# Setting pulse shape
self.pulseWindow.set(w_pulse).wait()
self.pulseOnTime.set(t_pulse).wait()
self.pulseCount.set(n_pulse).wait()
# Commiting configuration
self.pulseApply.set(1).wait()
# Enabling PSO waveform outputs
self.waveEnable.set("On").wait()
elif wmode in ["output", "flag"]:
self.waveEnable.set("Off").wait()
else:
raise RuntimeError(f"Unsupported window mode: {wmode}")
# Set PSO output data source
# FIXME : This is essentially staging...
if wmode in ["toggle", "toggled", "pulse", "pulsed"]:
self.outSource.set("Waveform").wait()
elif wmode in ["output", "flag"]:
self.outSource.set("Window").wait()
def unstage(self, settle_time=0.2):
""" Standard bluesky unstage"""
# Ensure output is set to low
if self.output.value:
self.toggle()
# Turn off window mode
self.winOutput.set("Off").wait()
self.winEvents.set("Off").wait()
# Turn off distance mode
self.dstEventsEna.set("Off").wait()
self.dstCounterEna.set("Off").wait()
# Sleep if specified
if settle_time is not None:
sleep(settle_time)
return super().unstage()
class aa1AxisPsoDistance(aa1AxisPsoBase):
"""Position Sensitive Output - Distance mode
@@ -640,7 +657,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
pso.configure(d={'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 5})
pso.kickoff().wait()
"""
_distance_value = 3.141592
_distance_value = None
# ########################################################################
# PSO high level interface
@@ -649,16 +666,14 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
functionality for distance mode PSO.
:param distance: The trigger distance or the array of distances between subsequent points.
:param wmode: Waveform mode configuration, usually pulsed/toggled.
:param wmode: Waveform mode configuration, usually pulsed/toggled (default: pulsed).
:param t_pulse : trigger high duration in pulsed mode (default: 200 us)
"""
distance = d["distance"]
wmode = str(d["wmode"])
t_pulse = d.get("t_pulse", 100)
w_pulse = d.get("w_pulse", 200)
n_pulse = d.get("n_pulse", 1)
distance = d.get("distance", None)
wmode = d.get("wmode", "pulsed")
# Validate input parameters
if wmode not in ["pulse", "pulsed", "toggle", "toggled"]:
if wmode is not None and wmode not in ["pulse", "pulsed", "toggle", "toggled"]:
raise RuntimeError(f"Unsupported distace triggering mode: {wmode}")
old = self.read_configuration()
@@ -669,41 +684,14 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
# Configure distance generator (also resets counter to 0)
self._distance_value = distance
if isinstance(distance, (float, int)):
self.dstDistance.set(distance).wait()
elif isinstance(distance, (np.ndarray, list, tuple)):
self.dstDistanceArr.set(distance).wait()
if distance is not None:
if isinstance(distance, (float, int)):
self.dstDistance.set(distance).wait()
elif isinstance(distance, (np.ndarray, list, tuple)):
self.dstDistanceArr.set(distance).wait()
# Configure the pulsed/toggled waveform
if wmode in ["toggle", "toggled"]:
# Switching to simple toggle mode
self.waveEnable.set("On").wait()
self.waveMode.set("Toggle").wait()
elif wmode in ["pulse", "pulsed"]:
# Switching to pulsed mode
self.waveEnable.set("On").wait()
self.waveMode.set("Pulse").wait()
# Setting pulse shape
if w_pulse is not None:
self.pulseWindow.set(w_pulse).wait()
if t_pulse is not None:
self.pulseOnTime.set(t_pulse).wait()
if n_pulse is not None:
self.pulseCount.set(n_pulse).wait()
# Commiting configuration
self.pulseApply.set(1).wait()
# Enabling PSO waveform outputs
self.waveEnable.set("On").wait()
else:
raise RuntimeError(f"Unsupported waveform mode: {wmode}")
# Ensure output is set to low
if self.output.value:
self.toggle()
# Set PSO output data source
self.outSource.set("Waveform").wait()
super().configure(d)
new = self.read_configuration()
logger.info(f"[{self.name}] PSO configured to {wmode} mode")
@@ -712,24 +700,17 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
# ########################################################################
# Bluesky step scan interface
def stage(self):
""" Stage the device"""
# Rearm the configured array
if isinstance(self._distance_value, (np.ndarray, list, tuple)):
self.dstArrayRearm.set(1).wait()
# Turn off counter monitoring
self.dstEventsEna.set("On").wait()
self.dstCounterEna.set("On").wait()
""" Stage the device: just ignore it... """
return super().stage()
def unstage(self):
""" Unstage the device"""
# Turn off counter monitoring
self.dstEventsEna.set("Off").wait()
self.dstCounterEna.set("Off").wait()
return super().unstage()
# ########################################################################
# Bluesky flyer interface
def prepare(self):
""" Rearms the configured array"""
# Rearm the configured array
if isinstance(self._distance_value, (np.ndarray, list, tuple)):
self.dstArrayRearm.set(1).wait()
def kickoff(self) -> DeviceStatus:
""" Kick off an asynchronous acquisition"""
# Rearm the configured array
@@ -803,9 +784,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
bounds = d["bounds"]
wmode = str(d["wmode"])
emode = d.get("emode", "Enter")
t_pulse = d.get("t_pulse", 100)
w_pulse = d.get("w_pulse", 200)
n_pulse = d.get("n_pulse", 1)
# Validate input parameters
if wmode not in ["pulse", "pulsed", "toggle", "toggled", "output", "flag"]:
@@ -815,7 +793,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
self._mode = wmode
self._eventMode = emode
old = self.read_configuration()
# Configure the window module
@@ -824,7 +801,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
self.winCounter.set(0).wait()
self._winLower.set(bounds[0]).wait()
self._winUpper.set(bounds[1]).wait()
elif isinstance(bounds, (np.ndarray, list, tuple)):
self.winCounter.set(0).wait()
self.winBoundsArr.set(bounds).wait()
@@ -833,33 +809,8 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
self.winOutput.set("Off").wait()
self.winEvents.set("Off").wait()
# Configure the pulsed/toggled waveform
if wmode in ["toggle", "toggled"]:
# Switching to simple toggle mode
self.waveEnable.set("On").wait()
self.waveMode.set("Toggle").wait()
elif wmode in ["pulse", "pulsed"]:
# Switching to pulsed mode
self.waveEnable.set("On").wait()
self.waveMode.set("Pulse").wait()
# Setting pulse shape
self.pulseWindow.set(w_pulse).wait()
self.pulseOnTime.set(t_pulse).wait()
self.pulseCount.set(n_pulse).wait()
# Commiting configuration
self.pulseApply.set(1).wait()
# Enabling PSO waveform outputs
self.waveEnable.set("On").wait()
elif wmode in ["output", "flag"]:
self.waveEnable.set("Off").wait()
else:
raise RuntimeError(f"Unsupported window mode: {wmode}")
# Set PSO output data source
if wmode in ["toggle", "toggled", "pulse", "pulsed"]:
self.outSource.set("Waveform").wait()
elif wmode in ["output", "flag"]:
self.outSource.set("Window").wait()
# Configure the pulsed/toggled/output waveform
super().configure(d)
new = self.read_configuration()
return (old, new)
@@ -886,14 +837,6 @@ class aa1AxisPsoWindow(aa1AxisPsoBase):
if settle_time is not None:
sleep(settle_time)
def unstage(self, settle_time=None):
""" Standard bluesky unstage"""
self.winOutput.set("Off").wait()
self.winEvents.set("Off").wait()
if settle_time is not None:
sleep(settle_time)
return super().unstage()
def complete(self) -> DeviceStatus:
"""Bluesky flyer interface"""
# Array mode waits until the buffer is empty

View File

@@ -367,36 +367,36 @@ class AerotechAutomation1Test(unittest.TestCase):
the PSO module and the polled feedback.
"""
# Throws if IOC is not available or PVs are incorrect
dev = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1")
pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1")
ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc")
mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x")
mot.wait_for_connection()
dev.wait_for_connection()
pso.wait_for_connection()
ddc.wait_for_connection()
pso.unstage()
ddc.unstage()
mot.unstage()
print("\nAX:PSO: Checking axis PSO in distance mode (basic)")
if dev.output.value:
dev.toggle()
sleep(1)
print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)")
# Configure steps and move to middle of range
dev.configure({'distance': 5, 'wmode': "toggle"})
dev.stage()
pso.configure({'distance': 5, 'wmode': "toggle"})
pso.kickoff()
mot.move(mot.position + 2.5)
# Start moving in steps, while checking output
for ii in range(10):
last_pso = dev.output.value
last_pso = pso.output.value
mot.move(mot.position + 5)
sleep(1)
self.assertTrue(dev.output.value != last_pso,
f"Expected to toggle the PSO output at step {ii}, got {dev.output.value}")
dev.unstage()
self.assertTrue(pso.output.value != last_pso,
f"Expected to toggle the PSO output at step {ii}, got {pso.output.value}")
pso.unstage()
ddc.unstage()
print("AX:PSO: Checking basic 'toggled' event generation with DDC (2/3)")
# Configure steps and move to middle of range
dev.configure({'distance': 2, 'wmode': "toggle"})
dev.stage()
pso.configure({'distance': 2, 'wmode': "toggle"})
pso.kickoff()
# Configure DDC
npoints = 720 / 2.0
@@ -406,7 +406,7 @@ class AerotechAutomation1Test(unittest.TestCase):
npoints_rbv = ddc.nsamples_rbv.value
self.assertEqual(npoints_rbv, 0,
f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}")
self.assertTrue(dev.output.value == 0, "Expected to start from LOW state")
self.assertTrue(pso.output.value == 0, "Expected to start from LOW state")
# Start moving and wait for polling
mot.move(mot.position + 720)
@@ -416,13 +416,13 @@ class AerotechAutomation1Test(unittest.TestCase):
npoints_rbv = ddc.nsamples_rbv.value
self.assertTrue(np.abs(npoints_rbv-720/4) <= 1,
f"Expected to record {720/4} points but got {npoints_rbv}")
dev.unstage()
pso.unstage()
ddc.unstage()
print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/5)")
# Configure steps and move to middle of range
dev.configure({'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 3})
dev.stage()
pso.configure({'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 3})
pso.kickoff()
# Configure DDC
npoints = 3 * 720 / 1.8
@@ -432,7 +432,7 @@ class AerotechAutomation1Test(unittest.TestCase):
npoints_rbv = ddc.nsamples_rbv.value
self.assertEqual(npoints_rbv, 0,
f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}")
self.assertTrue(dev.output.value == 0, "Expected to start from LOW state")
self.assertTrue(pso.output.value == 0, "Expected to start from LOW state")
# Start moving and wait for polling
mot.move(mot.position + 720)
@@ -443,7 +443,7 @@ class AerotechAutomation1Test(unittest.TestCase):
self.assertTrue(np.abs(npoints_rbv-3*720/1.8) <= 1,
f"Expected to record {3*720/1.8} points but got {npoints_rbv}")
print("AX:PSO: Done testing basic PSO distance functionality!")
dev.unstage()
pso.unstage()
ddc.unstage()
def test_09_PsoIntegration01(self):
@@ -452,22 +452,23 @@ class AerotechAutomation1Test(unittest.TestCase):
and compares them to the expected values.
"""
# Throws if IOC is not available or PVs are incorrect
dev = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1")
pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1")
ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc")
mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x")
mot.wait_for_connection()
dev.wait_for_connection()
pso.wait_for_connection()
ddc.wait_for_connection()
print("\nAX:PSO: Integration test for vector style PSO triggering (advanced)")
pso.unstage()
ddc.unstage()
mot.unstage()
print("\ntest_09_PsoIntegration01: Integration test for vector style PSO triggering (advanced)")
print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/3)")
# This is one line of the Tomcat sequence scan!
acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value
vec_dist = [acc_dist, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18]
dev.configure({'distance': vec_dist, 'wmode': "toggle"})
if dev.output.value:
dev.toggle()
dev.stage()
pso.configure({'distance': vec_dist, 'wmode': "toggle"})
pso.kickoff()
# Configure DDC
npoints_exp = len(vec_dist) / 2
@@ -477,7 +478,7 @@ class AerotechAutomation1Test(unittest.TestCase):
npoints_rbv = ddc.nsamples_rbv.value
self.assertEqual(npoints_rbv, 0,
f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}")
self.assertTrue(dev.output.value == 0, "Expected to start from LOW state")
self.assertTrue(pso.output.value == 0, "Expected to start from LOW state")
# Start moving and wait for polling
mot.move(mot.position + np.sum(vec_dist) + 10)
@@ -487,11 +488,11 @@ class AerotechAutomation1Test(unittest.TestCase):
npoints_rbv = ddc.nsamples_rbv.value
self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1,
f"Expected to record {npoints_exp} points but got {npoints_rbv}")
self.assertTrue(dev.output.value == 0,
self.assertTrue(pso.output.value == 0,
"Expected to finish in LOW state")
self.assertTrue(dev.dstArrayDepleted.value,
self.assertTrue(pso.dstArrayDepleted.value,
"Expected to cover all points in the distance array")
dev.unstage()
pso.unstage()
ddc.unstage()
print("AX:PSO: Checking 'individual' style trigger signal with DDC (2/5)")
@@ -499,8 +500,8 @@ class AerotechAutomation1Test(unittest.TestCase):
acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value
vec_dist = [acc_dist]
vec_dist.extend([1.618]*100)
dev.configure({'distance': vec_dist, 'wmode': "pulsed"})
dev.stage()
pso.configure({'distance': vec_dist, 'wmode': "pulsed"})
pso.kickoff()
# Configure DDC
npoints_exp = len(vec_dist)
@@ -510,7 +511,7 @@ class AerotechAutomation1Test(unittest.TestCase):
npoints_rbv = ddc.nsamples_rbv.value
self.assertEqual(npoints_rbv, 0,
f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}")
self.assertTrue(dev.output.value == 0, "Expected to start from LOW state")
self.assertTrue(pso.output.value == 0, "Expected to start from LOW state")
# Start moving and wait for polling (move negative direction)
mot.move(mot.position - np.sum(vec_dist) - acc_dist)
@@ -520,11 +521,11 @@ class AerotechAutomation1Test(unittest.TestCase):
npoints_rbv = ddc.nsamples_rbv.value
self.assertTrue(np.abs(npoints_rbv-npoints_exp) < 1,
f"Expected to record {npoints_exp} points but got {npoints_rbv}")
self.assertTrue(dev.output.value == 0, "Expected to finish in LOW state")
self.assertTrue(dev.dstArrayDepleted.value,
self.assertTrue(pso.output.value == 0, "Expected to finish in LOW state")
self.assertTrue(pso.dstArrayDepleted.value,
"Expected to cover all pints in the distance array")
print("AX:PSO: Done testing PSO module in distance mode!")
dev.unstage()
pso.unstage()
ddc.unstage()
def test_10_AxisPsoWindow1(self):
@@ -535,20 +536,21 @@ class AerotechAutomation1Test(unittest.TestCase):
the PSO module and the polled feedback.
"""
# Throws if IOC is not available or PVs are incorrect
dev = aa1AxisPsoWindow(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1")
pso = aa1AxisPsoWindow(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1")
ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc")
mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x")
mot.wait_for_connection()
dev.wait_for_connection()
pso.wait_for_connection()
ddc.wait_for_connection()
pso.unstage()
ddc.unstage()
mot.unstage()
print("\nAX:PSO: Checking axis PSO in window mode (basic)")
if dev.output.value:
dev.toggle()
print("AX:PSO: Directly checking basic window output (1/3)")
# Configure steps and move to middle of range
dev.configure({'bounds': (5, 13), 'wmode': "output"})
dev.stage()
pso.configure({'bounds': (5, 13), 'wmode': "output"})
pso.kickoff()
sleep(1)
# Move half a step
mot.move(mot.position + 0.1)
@@ -560,14 +562,14 @@ class AerotechAutomation1Test(unittest.TestCase):
dist += 0.2
sleep(1)
if 5 < dist < 13:
self.assertTrue(dev.output.value == 1,
self.assertTrue(pso.output.value == 1,
f"Expected HIGH PSO output inside the window at distance {dist}")
else:
self.assertTrue(dev.output.value == 0,
self.assertTrue(pso.output.value == 0,
f"Expected LOW PSO output outside the window at distance {dist}")
print("AX:PSO: The next tests fail due to encoder jitter!")
dev.unstage()
pso.unstage()
return
print("AX:PSO: Checking basic window event generation with DDC (2/3)")
@@ -625,7 +627,71 @@ class AerotechAutomation1Test(unittest.TestCase):
self.assertTrue(np.abs(npoints_rbv - 10) < 1,
f"Expected to record 10 points but got {npoints_rbv}")
def test_12_TomcatSequenceScan(self):
def test_12_StepScan(self):
""" Test a simple blusky style step scan (via BEC)
This module tests manual PSO trigger functionality. The goal is to
ensure the basic operation of the PSO module and the polled feedback.
"""
# Throws if IOC is not available or PVs are incorrect
pso = aa1AxisPsoDistance(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":PSO:", name="pso1")
ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc")
mot = EpicsMotor(AA1_IOC_NAME + ":" + AA1_AXIS_NAME, name="x")
mot.wait_for_connection()
pso.wait_for_connection()
ddc.wait_for_connection()
pso.unstage()
ddc.unstage()
mot.unstage()
t_start = time()
print("\ntest_12_StepScan: Simple step scan (via Ophyd)")
PosStart = 112.0
ScanRange = 180
ScanSteps = 18
step_size = ScanRange / ScanSteps
positions = []
for ii in range(ScanSteps+1):
positions.append(PosStart + ii * step_size)
# Motor setup
mot.move(PosStart)
self.assertTrue(np.abs(mot.position-PosStart) < 0.1,
f"Expected to move to scan start position {PosStart}, now at {mot.position}")
# PSO setup
pso.configure(d={'n_pulse': 2})
# DDC setup (trigger at each point plus some extra)
cfg = {'ntotal': 10 * (ScanSteps + 1), 'trigger': DriveDataCaptureTrigger.PsoOutput}
ddc.configure(cfg)
print("\tStage")
mot.stage()
pso.stage()
ddc.stage()
for ii in range(len(positions)):
mot.move(positions[ii])
self.assertTrue(np.abs(mot.position-positions[ii]) < 1.0,
f"Expected to reach {positions[ii]}, now is at {mot.position} on step {ii}")
pso.trigger()
print("\tUnstage")
mot.unstage()
ddc.unstage()
pso.unstage()
t_end = time()
t_elapsed = t_end - t_start
print(f"Elapsed scan time: {t_elapsed}")
print("Evaluate final state")
sleep(0.5)
npoints_rbv = ddc.nsamples_rbv.value
self.assertEqual(npoints_rbv, 2 * len(positions),
f"Expected to record {len(positions)} DDC points, got: {npoints_rbv}")
def test_13_TomcatSequenceScan(self):
""" Test the zig-zag sequence scan from Tomcat (via BEC)
This module tests basic PSO distance functionality, like events and
@@ -639,8 +705,11 @@ class AerotechAutomation1Test(unittest.TestCase):
mot.wait_for_connection()
pso.wait_for_connection()
ddc.wait_for_connection()
pso.unstage()
ddc.unstage()
mot.unstage()
t_start = time()
print("\nTOMCAT Sequeence scan (via Ophyd)")
print("\ntest_13_TomcatSequenceScan: TOMCAT Sequeence scan (via Ophyd)")
ScanStart = 42.0
ScanRange = 180
@@ -685,6 +754,8 @@ class AerotechAutomation1Test(unittest.TestCase):
mot.stage()
pso.stage()
ddc.stage()
print("\tKickoff")
pso.kickoff()
for ii in range(NumRepeat):
# No option to reset the index counter...
@@ -728,6 +799,7 @@ class AerotechAutomation1Test(unittest.TestCase):
self.assertEqual(npoints_rbv, NumRepeat,
f"Expected to record {NumRepeat} DDC points, but got: {npoints_rbv}")
def test_14_JinjaTomcatSequenceScan(self):
# Load the test file
try:
@@ -740,7 +812,7 @@ class AerotechAutomation1Test(unittest.TestCase):
templatetext = f.read()
import jinja2
scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 10, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500}
scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 13, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500}
tm = jinja2.Template(templatetext)
text = tm.render(scan=scanconfig)
# with open("test/tcatSequenceScan.ascript", "w") as text_file:
@@ -760,6 +832,15 @@ class AerotechAutomation1Test(unittest.TestCase):
t_elapsed = t_end - t_start
print(f"Elapsed scan time: {t_elapsed}")
sleep(0.5)
ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME + ":" + AA1_AXIS_NAME + ":DDC:", name="ddc")
ddc.wait_for_connection()
npoints_rbv = ddc.nsamples_rbv.value
self.assertEqual(npoints_rbv, 13,
f"Expected to record 13 DDC points, got: {npoints_rbv} after script")
if __name__ == "__main__":
unittest.main()