This commit is contained in:
gac-x05la
2024-10-25 14:29:44 +02:00
parent fca04d86bd
commit 4ecb6d0f1d
5 changed files with 301 additions and 866 deletions

View File

@@ -1,4 +1,2 @@
from .tutorial_fly_scan import AcquireDark, AcquireFlat, TutorialFlyScanContLine
from .gigafrost_test import GigaFrostStepScan
from .tomcat_scanbase import SnapAndStepScanBase, GigaStepScanBase, SequenceScanBase, SimpleFlyer
from .tutorial_fly_scan import AcquireDark, AcquireFlat, TutorialFlyScanContLine

View File

@@ -1,545 +0,0 @@
import time
import numpy as np
from bec_lib import bec_logger
from scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
logger = bec_logger.logger
class AeroSingleScan(AsyncFlyScanBase):
scan_name = "aero_single_scan"
scan_report_hint = "scan_progress"
required_kwargs = ["startpos", "scanrange", "psodist"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Performs a single line scan with PSO output and data collection.
Examples:
>>> scans.aero_single_scan(startpos=42, scanrange=2*10+3*180, psodist=[10, 180, 0.01, 180, 0.01, 180])
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = []
self.num_pos = 0
self.scanStart = self.caller_kwargs.get("startpos")
self.scanEnd = self.scanStart + self.caller_kwargs.get("scanrange")
self.psoBounds = self.caller_kwargs.get("psodist")
self.scanVel = self.caller_kwargs.get("velocity", 30)
self.scanTra = self.caller_kwargs.get("travel", 80)
self.scanAcc = self.caller_kwargs.get("acceleration", 500)
self.scanExpNum = self.caller_kwargs.get("daqpoints", 5000)
def pre_scan(self):
# Move to start position
st = yield from self.stubs.send_rpc_and_wait("es1_roty", "dmove", self.scanStart)
st.wait()
yield from self.stubs.pre_scan()
def scan_report_instructions(self):
"""Scan report instructions for the progress bar, yields from mcs card"""
if not self.scan_report_hint:
yield None
return
yield from self.stubs.scan_report_instruction({"scan_progress": ["es1_roty"]})
def scan_core(self):
# Configure PSO, DDC and motor
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc},
)
yield from self.stubs.send_rpc_and_wait(
"es1_psod", "configure", {"distance": self.psoBounds, "wmode": "toggle"}
)
yield from self.stubs.send_rpc_and_wait(
"es1_ddaq",
"configure",
{"npoints": self.scanExpNum},
)
# DAQ with real trigger
# yield from self.stubs.send_rpc_and_wait(
# "es1_ddaq", "configure", {"npoints": self.scanExpNum, "trigger": "HSINP0_RISE"},
# )
# Kick off PSO and DDC
st = yield from self.stubs.send_rpc_and_wait("es1_psod", "kickoff")
st.wait()
st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "kickoff")
st.wait()
print("Start moving")
# Start the actual movement
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"target": self.scanEnd},
)
yield from self.stubs.kickoff(
device="es1_roty",
parameter={"target": self.scanEnd},
)
# yield from self.stubs.set(device='es1_roty', value=self.scanEnd, wait_group="flyer")
target_diid = self.DIID - 1
# Wait for motion to finish
while True:
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", pointID=self.pointID)
self.pointID += 1
status = self.stubs.get_req_status(
device="es1_roty", RID=self.metadata["RID"], DIID=target_diid
)
progress = self.stubs.get_device_progress(device="es1_roty", RID=self.metadata["RID"])
print(f"status: {status}\tprogress: {progress}")
if progress:
self.num_pos = progress
if status:
break
time.sleep(1)
print("Scan done\n\n")
self.num_pos = self.pointID
class AeroSequenceScan(AsyncFlyScanBase):
scan_name = "aero_sequence_scan"
scan_report_hint = "table"
required_kwargs = ["startpos", "ranges"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Performs a sequence scan with PSO output and data collection
Examples:
>>> scans.aero_sequence_scan(startpos=42, ranges=([179.9, 0.1, 5]), expnum=3600, repnum=3, repmode="PosNeg")
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = ["es1_roty"]
self.num_pos = 0
self.scanStart = self.caller_kwargs.get("startpos")
self.scanRanges = self.caller_kwargs.get("ranges")
self.scanExpNum = self.caller_kwargs.get("expnum", 25000)
self.scanRepNum = self.caller_kwargs.get("repnum", 1)
self.scanRepMode = self.caller_kwargs.get("repmode", "Pos")
self.scanVel = self.caller_kwargs.get("velocity", 30)
self.scanTra = self.caller_kwargs.get("travel", 80)
self.scanAcc = self.caller_kwargs.get("acceleration", 500)
self.scanSafeDist = self.caller_kwargs.get("safedist", 10)
if isinstance(self.scanRanges[0], (int, float)):
self.scanRanges = self.scanRanges
if self.scanRepMode not in ["PosNeg", "Pos", "NegPos", "Neg"]:
raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}")
def pre_scan(self):
# Calculate PSO positions from tables
AccDist = 0.5 * self.scanVel * self.scanVel / self.scanAcc + self.scanSafeDist
# Relative PSO bounds
self.psoBoundsPos = [AccDist]
try:
for line in self.scanRanges:
print(f"Line is: {line} of type {type(line)}")
for rr in range(int(line[2])):
self.psoBoundsPos.append(line[0])
self.psoBoundsPos.append(line[1])
except TypeError:
line = self.scanRanges
print(f"Line is: {line} of type {type(line)}")
for rr in range(int(line[2])):
self.psoBoundsPos.append(line[0])
self.psoBoundsPos.append(line[1])
del self.psoBoundsPos[-1]
self.psoBoundsNeg = [AccDist]
self.psoBoundsNeg.extend(self.psoBoundsPos[::-1])
scanrange = 2 * AccDist + np.sum(self.psoBoundsPos)
if self.scanRepMode in ["PosNeg", "Pos"]:
self.PosStart = self.scanStart - AccDist
self.PosEnd = self.scanStart + scanrange
elif self.scanRepMode in ["NegPos", "Neg"]:
self.PosStart = self.scanStart + AccDist
self.PosEnd = self.scanStart - scanrange
else:
raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}")
print(f"\tCalculated scan range: {self.PosStart} to {self.PosEnd} range {scanrange}")
# ToDo: We could append all distances and write a much longer 'distance array'. this would elliminate the need of rearming...
# Move roughly to start position
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc},
)
st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart)
st.wait()
yield from self.stubs.pre_scan()
def scan_core(self):
# Move to start position (with travel velocity)
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc},
)
yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart)
# Condigure PSO, DDC and motorHSINP0_RISE
yield from self.stubs.send_rpc_and_wait(
"es1_psod", "configure", {"distance": self.psoBoundsPos, "wmode": "toggle"}
)
yield from self.stubs.send_rpc_and_wait(
"es1_ddaq",
"configure",
{"npoints": self.scanExpNum},
)
# With real trigger
# yield from self.stubs.send_rpc_and_wait(
# "es1_ddaq", "configure", {"npoints": self.scanExpNum, "trigger": "HSINP0_RISE"}
# )
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc},
)
# Kickoff pso and daq
st = yield from self.stubs.send_rpc_and_wait("es1_psod", "kickoff")
st.wait()
st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "kickoff")
st.wait()
# Run the actual scan (haven't figured out the proggress bar)
print("Starting actual scan loop")
for ii in range(self.scanRepNum):
print(f"Scan segment {ii}")
# No option to reset the index counter...
yield from self.stubs.send_rpc_and_wait("es1_psod", "dstArrayRearm.set", 1)
if self.scanRepMode in ["Pos", "Neg"]:
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc},
)
st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosEnd)
st.wait()
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc},
)
st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart)
st.wait()
elif self.scanRepMode in ["PosNeg", "NegPos"]:
if ii % 2 == 0:
st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosEnd)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart)
st.wait()
self.pointID += 1
self.num_pos += 1
time.sleep(0.2)
# Complete (should complete instantly)
yield from self.stubs.complete(device="es1_psod")
yield from self.stubs.complete(device="es1_ddaq")
st = yield from self.stubs.send_rpc_and_wait("es1_psod", "complete")
st.wait()
st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "complete")
st.wait()
# Collect - Throws a warning due to returning a generator
# st = yield from self.stubs.send_rpc_and_wait("es1_psod", "collect")
# st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect")
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
target_diid = self.DIID - 1
yield from self.stubs.kickoff(
device="es1_roty",
parameter={"target": self.PosStart},
)
yield from self.stubs.wait(device=["es1_roty"], wait_group="kickoff", wait_type="move")
yield from self.stubs.complete(device="es1_roty")
# Wait for motion to finish
while True:
pso_status = self.stubs.get_req_status(
device="es1_psod", RID=self.metadata["RID"], DIID=target_diid
)
daq_status = self.stubs.get_req_status(
device="es1_ddaq", RID=self.metadata["RID"], DIID=target_diid
)
mot_status = self.stubs.get_req_status(
device="es1_roty", RID=self.metadata["RID"], DIID=target_diid
)
progress = self.stubs.get_device_progress(device="es1_psod", RID=self.metadata["RID"])
progress = self.stubs.get_device_progress(device="es1_ddaq", RID=self.metadata["RID"])
progress = self.stubs.get_device_progress(device="es1_roty", RID=self.metadata["RID"])
print(f"pso: {pso_status}\tdaq: {daq_status}\tmot: {mot_status}\tprogress: {progress}")
if progress:
self.num_pos = int(progress)
if mot_status:
break
time.sleep(1)
print("Scan done\n\n")
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
return super().cleanup()
class AeroScriptedScan(AsyncFlyScanBase):
scan_name = "aero_scripted_scan"
scan_report_hint = "table"
required_kwargs = ["filename", "subs"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Executes an AeroScript template as a flyer
The script is generated from a template file using jinja2.
Examples:
>>> scans.aero_scripted_scan(filename="AerotechSnapAndStepTemplate.ascript", subs={'startpos': 42, 'stepsize': 0.1, 'numsteps': 1800, 'exptime': 0.1})
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = ["es1_roty"]
self.num_pos = 0
self.filename = self.caller_kwargs.get("filename")
self.subs = self.caller_kwargs.get("subs")
self.taskIndex = self.caller_kwargs.get("taskindex", 4)
def pre_scan(self):
print("TOMCAT Loading Aeroscript template")
# Load the test file
with open(self.filename) as f:
templatetext = f.read()
# Substitute jinja template
import jinja2
tm = jinja2.Template(templatetext)
self.scripttext = tm.render(scan=self.subs)
yield from self.stubs.pre_scan()
def scan_core(self):
print("TOMCAT Sequeence scan (via Jinjad AeroScript)")
t_start = time.time()
# Configure by copying text to controller file and compiling it
yield from self.stubs.send_rpc_and_wait(
"es1_tasks",
"configure",
{"text": self.scripttext, "filename": "becExec.ascript", "taskIndex": self.taskIndex},
)
# Kickoff
st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff")
st.wait()
time.sleep(0.5)
# Complete
yield from self.stubs.complete(device="es1_tasks")
# Collect - up to implementation
t_end = time.time()
t_elapsed = t_end - t_start
print(f"Elapsed scan time: {t_elapsed}")
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = self.pointID
return super().cleanup()
class AeroSnapNStep(AeroScriptedScan):
scan_name = "aero_snapNstep"
scan_report_hint = "table"
required_kwargs = ["startpos", "expnum"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Executes a scripted SnapNStep scan
This scan generates and executes an AeroScript file to run
a hardware step scan on the Aerotech controller.
The script is generated from a template file using jinja2.
Examples:
>>> scans.scans.aero_snapNstep(startpos=42, range=180, expnum=1800, exptime=0.1)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = ["es1_roty"]
self.num_pos = 0
# self.filename = "/afs/psi.ch/user/m/mohacsi_i/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSnapAndStepTemplate.ascript"
self.filename = "/sls/X05LA/data/x05laop/bec/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSnapAndStepTemplate.ascript"
self.scanTaskIndex = self.caller_kwargs.get("taskindex", 4)
self.scanStart = self.caller_kwargs.get("startpos")
self.scanExpNum = self.caller_kwargs.get("expnum")
self.scanRange = self.caller_kwargs.get("range", 180)
self.scanExpTime = self.caller_kwargs.get("exptime", 0.1)
self.scanStepSize = self.scanRange / self.scanExpNum
# self.scanVel = self.caller_kwargs.get("velocity", 30)
# self.scanTra = self.caller_kwargs.get("travel", 80)
# self.scanAcc = self.caller_kwargs.get("acceleration", 500)
self.subs = {
"startpos": self.scanStart,
"stepsize": self.scanStepSize,
"numsteps": self.scanExpNum,
"exptime": self.scanExpTime,
}
def scan_core(self):
print("TOMCAT Snap N Step scan (via Jinjad AeroScript)")
# Run template execution frm parent
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", pointID=self.pointID)
self.pointID += 1
yield from super().scan_core()
# Collect - Throws a warning due to returning a generator
yield from self.stubs.send_rpc_and_wait("es1_ddaq", "npoints.put", self.scanExpNum)
# st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect")
# st.wait()
class AeroScriptedSequence(AeroScriptedScan):
scan_name = "aero_scripted_sequence"
scan_report_hint = "table"
required_kwargs = ["startpos", "ranges"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Executes a scripted sequence scan
This scan generates and executes an AeroScript file to run a hardware sequence scan on the
Aerotech controller. You might win a few seconds this way, but it has some limtations...
The script is generated from a template file using jinja2.
Examples:
>>> scans.aero_scripted_sequence(startpos=42, ranges=([179.9, 0.1, 5]), expnum=3600, repnum=3, repmode="PosNeg")
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = ["es1_roty"]
self.num_pos = 0
self.filename = "/afs/psi.ch/user/m/mohacsi_i/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSimpleSequenceTemplate.ascript"
self.scanTaskIndex = self.caller_kwargs.get("taskindex", 4)
self.scanStart = self.caller_kwargs.get("startpos")
self.scanRanges = self.caller_kwargs.get("ranges")
self.scanExpNum = self.caller_kwargs.get("expnum", 25000)
self.scanRepNum = self.caller_kwargs.get("repnum", 1)
self.scanRepMode = self.caller_kwargs.get("repmode", "Pos")
self.scanVel = self.caller_kwargs.get("velocity", 30)
self.scanTra = self.caller_kwargs.get("travel", 80)
self.scanAcc = self.caller_kwargs.get("acceleration", 500)
self.scanSafeDist = self.caller_kwargs.get("safedist", 10)
self.subs = {
"startpos": self.scanStart,
"scandir": self.scanRepMode,
"nrepeat": self.scanRepNum,
"npoints": self.scanExpNum,
"scanvel": self.scanVel,
"jogvel": self.scanTra,
"scanacc": self.scanAcc,
}
if self.scanRepMode not in ["PosNeg", "Pos", "NegPos", "Neg"]:
raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}")
if isinstance(self.scanRanges[0], (int, float)):
self.scanRanges = self.scanRanges
def pre_scan(self):
# Calculate PSO positions from tables
AccDist = 0.5 * self.scanVel * self.scanVel / self.scanAcc + self.scanSafeDist
# Relative PSO bounds
self.psoBoundsPos = [AccDist]
try:
for line in self.scanRanges:
print(f"Line is: {line} of type {type(line)}")
for rr in range(int(line[2])):
self.psoBoundsPos.append(line[0])
self.psoBoundsPos.append(line[1])
except TypeError:
line = self.scanRanges
print(f"Line is: {line} of type {type(line)}")
for rr in range(int(line[2])):
self.psoBoundsPos.append(line[0])
self.psoBoundsPos.append(line[1])
del self.psoBoundsPos[-1]
self.psoBoundsNeg = [AccDist]
self.psoBoundsNeg.extend(self.psoBoundsPos[::-1])
self.scanrange = 2 * AccDist + np.sum(self.psoBoundsPos)
if self.scanRepMode in ["PosNeg", "Pos"]:
self.PosStart = self.scanStart - AccDist
elif self.scanRepMode in ["NegPos", "Neg"]:
self.PosStart = self.scanStart + AccDist
else:
raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}")
print(f"\tCalculated scan range: {self.PosStart} range {self.scanrange}")
# ToDo: We could append all distances and write a much longer 'distance array'. this would elliminate the need of rearming...
self.subs.update(
{
"psoBoundsPos": self.psoBoundsPos,
"psoBoundsNeg": self.psoBoundsNeg,
"scanrange": self.scanrange,
}
)
# Move roughly to start position
yield from self.stubs.send_rpc_and_wait(
"es1_roty",
"configure",
{"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc},
)
st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart)
st.wait()
yield from super().pre_scan()
def scan_core(self):
print("TOMCAT Sequence scan (via Jinjad AeroScript)")
# Run template execution frm parent
yield from super().scan_core()
# Collect - Throws a warning due to returning a generator
yield from self.stubs.send_rpc_and_wait("es1_ddaq", "npoints.put", self.scanExpNum)
# st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect")
# st.wait()

View File

@@ -1,122 +0,0 @@
import time
import numpy as np
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
logger = bec_logger.logger
class GigaFrostStepScan(AsyncFlyScanBase):
"""Test scan for running the GigaFrost with standard DAQ
"""
scan_name = "gigafrost_line_scan"
scan_report_hint = "table"
required_kwargs = ["motor", "range"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Example step scan
Perform a simple step scan with a motor while software triggering the
gigafrost burst sequence at each point and recording it to the StdDAQ.
Actually only the configuration is gigafrost specific, everything else
is just using standard Bluesky event model.
Example
-------
>>> scans.gigafrost_line_scan(motor='eyex', range=(-5, 5), steps=10, exp_time=20, exp_burst=10, relative=True)
Parameters
----------
motor: str
Scan motor name, moveable, mandatory.
range : (float, float)
Scan range of the axis.
steps : int, optional
Number of scan steps to cover the range. (default = 10)
exp_time : float, optional [0.01 ... 40]
Exposure time for each frame in [ms]. The IOC fixes the exposure
period to be 2x this long so it doesnt matter. (default = 20)
exp_burst : float, optional
Number of images to be taken for each scan point. (default=10)
relative : boolean, optional
Toggle between relative and absolute input coordinates.
(default = False)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.num_pos = 0
self.scan_motors = [self.caller_kwargs.get("motor")]
self.scan_range = self.caller_kwargs.get("range")
self.scan_relat = self.caller_kwargs.get("relative", False)
self.scan_steps = int(self.caller_kwargs.get("steps", 10))
self.scan_exp_t = float(self.caller_kwargs.get("exp_time", 5))
self.scan_exp_p = 2 * self.scan_exp_t
self.scan_exp_b = int(self.caller_kwargs.get("exp_burst", 10))
if self.scan_steps <=0:
raise RuntimeError(f"Requires at least one scan step")
def prepare_positions(self):
# Calculate scan start position
if self.scan_relat:
pos = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "read")
self.start_pos = pos[self.scan_motors[0]].get("value") + self.scan_range[0]
else:
self.start_pos = self.scan_range[0]
# Calculate step size
self.step_size = (self.scan_range[1]-self.scan_range[0])/self.scan_steps
self.positions = [self.start_pos]
for ii in range(self.scan_steps):
self.positions.append(self.start_pos + ii*self.step_size)
self.num_positions = len(self.positions)
def pre_scan(self):
# Move roughly to start position
print(f"Scan start position: {self.start_pos}")
st = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "move", self.start_pos)
st.wait()
yield from self.stubs.pre_scan()
def stage(self):
d= {
"ntotal": self.scan_steps * self.scan_exp_b,
"nimages": self.scan_exp_b,
"exposure": self.scan_exp_t,
"period": self.scan_exp_p,
"pixel_width": 480,
"pixel_height": 128
}
yield from self.stubs.send_rpc_and_wait("gfclient", "configure", d)
# For god, NO!
yield from super().stage()
def scan_core(self):
self.pointID = 0
for ii in range(self.num_positions):
print(f"Point: {ii}")
st = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "move", self.positions[ii])
st.wait()
st = yield from self.stubs.send_rpc_and_wait("gfclient", "trigger")
st.wait()
self.pointID += 1
time.sleep(0.2)
time.sleep(1)
print("Scan done\n\n")
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
print("Scan cleanup\n\n")
return super().cleanup()

View File

@@ -13,19 +13,18 @@ import os
import time
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanBase
logger = bec_logger.logger
class TemplatedScanBase(AsyncFlyScanBase):
""" Base class for templated scans
"""Base class for templated scans
Low-level base class for templated AeroScript scans at the Tomcat beamlines.
It sets the order of operations between aerotech, gigafrost and the standard
DAQ. But as a base class, it leaves ample freedom for individual
hardware configurations and scan implementations.
DAQ. But as a base class, it leaves ample freedom for individual
hardware configurations and scan implementations.
Example
-------
@@ -52,16 +51,27 @@ class TemplatedScanBase(AsyncFlyScanBase):
daqmode: str
Operation mode for the position recording. (default = collect)
"""
scan_name = "aeroscript_scan_base"
scan_report_hint = "table"
required_kwargs = ["filename", "subs"]
def __init__(self, *, filename=None, filetext=None, filesubs={},
controller="es1_tasks", taskindex=4,
camera="gfclient", camcfg=None,
drvdaq="es1_ddaq", daqcfg=None, daqmode="collect",
preview="daq_stream1",
**kwargs):
def __init__(
self,
*,
filename=None,
filetext=None,
filesubs={},
controller="es1_tasks",
taskindex=4,
camera="gfclient",
camcfg=None,
drvdaq="es1_ddaq",
daqcfg=None,
daqmode="collect",
preview="daq_stream1",
**kwargs,
):
super().__init__(**kwargs)
self.axis = []
self.num_pos = 0
@@ -81,12 +91,13 @@ class TemplatedScanBase(AsyncFlyScanBase):
if self.filename is None and self.filetext is None:
raise RuntimeError("Must provide either filename or text to scan")
def prepare_positions(self):
""" Prepare action: render AeroScript file"""
"""Prepare action: render AeroScript file"""
# Load the test file
if self.filename is not None:
filename = os.path.join(os.path.dirname(__file__), '../devices/aerotech/' + self.filename)
filename = os.path.join(
os.path.dirname(__file__), "../devices/aerotech/" + self.filename
)
logger.info(f"Attempting to load file {filename}")
with open(filename) as f:
templatetext = f.read()
@@ -97,9 +108,8 @@ class TemplatedScanBase(AsyncFlyScanBase):
yield from super().prepare_positions()
def stage(self):
""" Configure and stage all devices"""
"""Configure and stage all devices"""
print("TOMCAT Staging scripted scan (via Jinjad AeroScript)")
@@ -128,9 +138,8 @@ class TemplatedScanBase(AsyncFlyScanBase):
# TODO : This should stage the entire beamline
# yield from super().stage()
def scan_core(self):
""" The actual scan routine"""
"""The actual scan routine"""
print("TOMCAT Running scripted scan (via Jinjad AeroScript)")
t_start = time.time()
@@ -169,19 +178,19 @@ class TemplatedScanBase(AsyncFlyScanBase):
return super().cleanup()
class SimpleFlyer(AsyncFlyScanBase):
""" Simple fly scan base class
"""Simple fly scan base class
Very simple flyer class (something like the bluesky flyer).
It expects every device to be configured before the scan.
Example
-------
>>> scans.simpleflyer(sync_devices=['daq_stream1'], flyer_devices=['es1_tasks', 'es1_psod', 'es1_ddaq'])
>>> scans.simpleflyer(
sync_devices=['daq_stream1'],
flyer_devices=['es1_tasks', 'es1_psod', 'es1_ddaq'])
"""
scan_name = "simpleflyer"
scan_report_hint = "table"
required_kwargs = ["sync_devices", "flyer_devices"]
@@ -190,7 +199,7 @@ class SimpleFlyer(AsyncFlyScanBase):
# Auto-setup configuration parameters from input
self.sync_devices = sync_devices
self.async_devices = async_devices
# Call super()
super().__init__(**kwargs)
@@ -211,57 +220,54 @@ class SimpleFlyer(AsyncFlyScanBase):
for dev in self.async_devices:
yield from self.stubs.send_rpc_and_wait(dev, "complete")
def unstage(self):
""" Wait for DAQ before unstaging"""
"""Wait for DAQ before unstaging"""
time.sleep(1)
yield from super().unstage()
class GigaStepScanBase(ScanBase):
""" Gigafrost software step scan base class
"""Gigafrost software step scan base class
Example class for simple BEC-based step scans using the low-level API.
Until the final cabling is done, the scan can be tested in soft-trigger
mode.
mode.
Example
-------
>>> scans.gigastep(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5)
"""
scan_name = "gigastep"
required_kwargs = ["scan_start", "scan_end","steps"]
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters" : ["exp_time", "burst_at_each_point", "roix", "roiy"]
"Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy"],
}
def _get_scan_motors(self):
self.scan_motors = ["es1_roty"]
def __init__(
self,
scan_start: float,
scan_end: float,
steps: int,
exp_time = 0.005,
settling_time = 0.2,
burst_at_each_point = 1,
roix = 2016,
roiy = 2016,
sync='event',
**kwargs):
self,
scan_start: float,
scan_end: float,
steps: int,
exp_time=0.005,
settling_time=0.2,
burst_at_each_point=1,
roix=2016,
roiy=2016,
sync="event",
**kwargs,
):
super().__init__(
exp_time=exp_time,
settling_time=settling_time,
relative=False,
burst_at_each_point=1,
optim_trajectory=None,
**kwargs
**kwargs,
)
self.motor = "es1_roty"
@@ -280,28 +286,28 @@ class GigaStepScanBase(ScanBase):
# Prepare configuration dicts
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4}
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
daq_trigger = t_modes[sync]
self.psocfg = {}
self.daqcfg = {'ntotal': self.scan_ntotal, 'trigger': daq_trigger}
self.daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger}
self.camcfg = {
"ntotal": self.scan_ntotal,
"nimages": burst_at_each_point,
"exposure": 1000*exp_time,
"period": 2000*exp_time,
"ntotal": self.scan_ntotal,
"nimages": burst_at_each_point,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"trigger_mode": "soft",
"pixel_width": self.scan_roix,
"pixel_height": self.scan_roiy
}
"pixel_width": self.scan_roix,
"pixel_height": self.scan_roiy,
}
def _calculate_positions(self) -> None:
""" Pre-calculate scan positions """
for ii in range(self.scan_steps+1):
self.positions.append(self.scan_start + ii*self.scan_stepsize)
"""Pre-calculate scan positions"""
for ii in range(self.scan_steps + 1):
self.positions.append(self.scan_start + ii * self.scan_stepsize)
def stage(self):
""" First do the actual configuration then the gutted staging"""
"""First do the actual configuration then the gutted staging"""
print(f"TOMCAT Staging step scan with motors {self.motor}")
# Configure the camera (usually together wit the DAQ)
@@ -325,38 +331,48 @@ class GigaStepScanBase(ScanBase):
return super().stage()
def unstage(self):
""" Wait for DAQ before unstaging"""
"""Wait for DAQ before unstaging"""
time.sleep(1)
yield from super().unstage()
class SnapAndStepScanBase(TemplatedScanBase):
""" Snap'n Step scan base class
"""Snap'n Step scan base class
Example base class for AeroScript-based high speed step scans. This
function loads a pre-written AeroScript file on the Automation1
function loads a pre-written AeroScript file on the Automation1
controller for maximum stepping speed.
NOTE: As a temporary setup until the final cabling is done, the scan
is in a soft-trigger mode. The Gigafrost is just software triggered,
NOTE: As a temporary setup until the final cabling is done, the scan
is in a soft-trigger mode. The Gigafrost is just software triggered,
and the position readback is wired to the internal trigger.
Example
-------
>>> scans.snapnstep(-20 160, steps=1800, exp_time=0.005, burst_at_each_point=5)
"""
scan_name = "snapnstep"
scan_report_hint = "table"
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["scan_start", "scan_end", "steps"],
"Acquisition parameters" : ["exp_time", "burst_at_each_point", "roix", "roiy", "sync"]
"Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy", "sync"],
}
def __init__(self, scan_start, scan_end, steps,
exp_time=0.005, settling_time=0, burst_at_each_point=1,
roix=2016, roiy=2016, sync="event", **kwargs):
def __init__(
self,
scan_start,
scan_end,
steps,
exp_time=0.005,
settling_time=0,
burst_at_each_point=1,
roix=2016,
roiy=2016,
sync="event",
**kwargs,
):
# Auto-setup configuration parameters from input
self.scan_start = scan_start
self.scan_end = scan_end
@@ -366,87 +382,116 @@ class SnapAndStepScanBase(TemplatedScanBase):
self.settling_time = settling_time
# Synthetic values
self.scan_stepsize = (scan_end-scan_start)/steps
self.scan_ntotal = (steps+1) * burst_at_each_point
self.scan_stepsize = (scan_end - scan_start) / steps
self.scan_ntotal = (steps + 1) * burst_at_each_point
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4}
p_modes = {'pso': 'PsoOutput', 'event': 'PsoEvent', 'inp0': 'HighSpeedInput0RisingEdge', 'inp1': 'HighSpeedInput1RisingEdge'}
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
p_modes = {
"pso": "PsoOutput",
"event": "PsoEvent",
"inp0": "HighSpeedInput0RisingEdge",
"inp1": "HighSpeedInput1RisingEdge",
}
daq_trigger = t_modes[sync]
pso_trigger = p_modes[sync]
filename = "AerotechSnapAndStepTemplate.ascript"
filesubs = {
'startpos': self.scan_start,
'stepsize': self.scan_stepsize,
'numsteps': self.scan_steps,
'exptime': 2*self.exp_time*self.exp_burst,
'settling': self.settling_time,
'psotrigger': pso_trigger,
'npoints': self.scan_ntotal
}
"startpos": self.scan_start,
"stepsize": self.scan_stepsize,
"numsteps": self.scan_steps,
"exptime": 2 * self.exp_time * self.exp_burst,
"settling": self.settling_time,
"psotrigger": pso_trigger,
"npoints": self.scan_ntotal,
}
# DDC config
daqname = "es1_ddaq"
daqcfg = {'ntotal': self.scan_ntotal, 'trigger': daq_trigger}
daqname = "es1_ddaq"
daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger}
# Gigafrost config
camname = "gfclient"
camcfg = {
"ntotal": self.scan_ntotal,
"nimages": burst_at_each_point,
"exposure": 1000 * exp_time,
"period": 2000*exp_time,
"ntotal": self.scan_ntotal,
"nimages": burst_at_each_point,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"trigger_mode": "eternal",
"pixel_width": roix,
"pixel_height": roiy
}
"pixel_width": roix,
"pixel_height": roiy,
}
# Parameter validation before scan
if self.scan_steps <=0:
if self.scan_steps <= 0:
raise RuntimeError(f"Requires at least one scan step, got {self.scan_steps}")
# Call super()
super().__init__(
filename=filename, filesubs=filesubs,
controller="es1_tasks", taskindex=4,
camera=camname, camcfg=camcfg,
drvdaq=daqname, daqcfg=daqcfg, daqmode="collect",
filename=filename,
filesubs=filesubs,
controller="es1_tasks",
taskindex=4,
camera=camname,
camcfg=camcfg,
drvdaq=daqname,
daqcfg=daqcfg,
daqmode="collect",
preview="daq_stream1",
**kwargs)
**kwargs,
)
def unstage(self):
""" Wait for DAQ before unstaging"""
"""Wait for DAQ before unstaging"""
time.sleep(1)
yield from super().unstage()
class SequenceScanBase(TemplatedScanBase):
""" Sequence scan base class
"""Sequence scan base class
Example base class for AeroScript-based sequence scans. This function
loads a pre-written AeroScript file on the Automation1 controller for
low latency gated scans.
NOTE: As a temporary setup until the final cabling is done, the scan
is in a soft-trigger mode. The Gigafrost is just software triggered,
NOTE: As a temporary setup until the final cabling is done, the scan
is in a soft-trigger mode. The Gigafrost is just software triggered,
and the position readback is wired to the internal trigger.
Example
-------
>>> scans.sequencescan(33, 180, 180, exp_time=0.005, exp_frames=1800, repeats=10)
"""
scan_name = "sequencescan"
scan_report_hint = "table"
required_kwargs = ["scan_start", "gate_high", "gate_low"]
gui_config = {
"Movement parameters": ["scan_start", "repeats", "repmode"],
"Acquisition parameters" : ["gate_high", "gate_low", "exp_time", "exp_frames", "roix", "roiy", "sync"]
"Acquisition parameters": [
"gate_high",
"gate_low",
"exp_time",
"exp_frames",
"roix",
"roiy",
"sync",
],
}
def __init__(self, scan_start, gate_high, gate_low, repeats=1, repmode="PosNeg",
exp_time=0.005, exp_frames=180,
roix=2016, roiy=2016, sync="pso", **kwargs):
def __init__(
self,
scan_start,
gate_high,
gate_low,
repeats=1,
repmode="PosNeg",
exp_time=0.005,
exp_frames=180,
roix=2016,
roiy=2016,
sync="pso",
**kwargs,
):
# Auto-setup configuration parameters from input
self.scan_start = scan_start
self.gate_high = gate_high
@@ -460,82 +505,96 @@ class SequenceScanBase(TemplatedScanBase):
self.scan_velocity = gate_high / (exp_time * exp_frames)
self.scan_acceleration = 500
self.scan_safedistance = 10
self.scan_accdistance = self.scan_safedistance + 0.5* self.scan_velocity * self.scan_velocity / self.scan_acceleration
self.scan_accdistance = (
self.scan_safedistance
+ 0.5 * self.scan_velocity * self.scan_velocity / self.scan_acceleration
)
self.scan_ntotal = exp_frames * repeats
if repmode.upper() in ("POS", "NEG"):
self.scan_range = repeats *(gate_high + gate_low)
self.scan_range = repeats * (gate_high + gate_low)
elif repmode.upper() in ("POSNEG", "NEGPOS"):
self.scan_range = gate_high + gate_low
else:
raise RuntimeError(f"Unsupported repetition mode: {repmode}")
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4}
p_modes = {'pso': 'PsoOutput', 'event': 'PsoEvent', 'inp0': 'HighSpeedInput0RisingEdge', 'inp1': 'HighSpeedInput1RisingEdge'}
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
p_modes = {
"pso": "PsoOutput",
"event": "PsoEvent",
"inp0": "HighSpeedInput0RisingEdge",
"inp1": "HighSpeedInput1RisingEdge",
}
daq_trigger = t_modes[sync]
pso_trigger = p_modes[sync]
# AeroScript substitutions
filename = "AerotechSimpleSequenceTemplate.ascript"
filesubs = {
'startpos': self.scan_start,
'scanrange': self.scan_range,
'nrepeat': self.scan_repnum,
'scanvel': self.scan_velocity,
'scanacc': self.scan_acceleration,
'accdist': self.scan_accdistance,
'npoints' : self.scan_ntotal,
'scandir' : self.scan_repmode.upper(),
'psotrigger': pso_trigger
}
print(filesubs)
"startpos": self.scan_start,
"scanrange": self.scan_range,
"nrepeat": self.scan_repnum,
"scanvel": self.scan_velocity,
"scanacc": self.scan_acceleration,
"accdist": self.scan_accdistance,
"npoints": self.scan_ntotal,
"scandir": self.scan_repmode.upper(),
"psotrigger": pso_trigger,
}
# Drice data collection config
# FIXME: The data sources need to be set to same as the script
daqname = "es1_ddaq"
daqcfg = {'ntotal': self.scan_ntotal, 'trigger': daq_trigger}
daqname = "es1_ddaq"
daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger}
# Gigafrost config
camname = "gfclient"
camcfg = {
"ntotal": self.scan_ntotal,
"nimages": self.exp_frames,
"exposure": 1000 * exp_time,
"period": 2000*exp_time,
"pixel_width": roix,
"pixel_height": roiy
}
"ntotal": self.scan_ntotal,
"nimages": self.exp_frames,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"pixel_width": roix,
"pixel_height": roiy,
}
# Parameter validation before scan
if self.exp_frames <=0:
if self.exp_frames <= 0:
raise RuntimeError(f"Requires at least one frame per cycle, got {self.exp_frames}")
# Call super()
super().__init__(
filename=filename, filesubs=filesubs,
controller="es1_tasks", taskindex=4,
camera=camname, camcfg=camcfg,
drvdaq=daqname, daqcfg=daqcfg, daqmode="collect",
filename=filename,
filesubs=filesubs,
controller="es1_tasks",
taskindex=4,
camera=camname,
camcfg=camcfg,
drvdaq=daqname,
daqcfg=daqcfg,
daqmode="collect",
preview="daq_stream1",
**kwargs)
**kwargs,
)
def prepare_positions(self):
# Fill PSO vectors according to scan direction
# NOTE: Distance counter is bidirectional
self.psoBoundsPos = []
self.psoBoundsNeg = []
pso_bounds_pos = []
pso_bounds_neg = []
if self.scan_repmode in ("pos", "Pos", "POS", "neg", "Neg", "NEG"):
self.psoBoundsPos.append(self.scan_accdistance)
pso_bounds_pos.append(self.scan_accdistance)
for ii in range(self.scan_repnum):
self.psoBoundsPos += [self.gate_high, self.gate_low]
self.psoBoundsPos = self.psoBoundsPos[:-1]
pso_bounds_pos += [self.gate_high, self.gate_low]
pso_bounds_pos = pso_bounds_pos[:-1]
if self.scan_repmode in ("posneg", "PosNeg", "POSNEG", "negpos", "NegPos", "NEGPOS"):
self.psoBoundsPos = [self.scan_accdistance, self.gate_high]
self.psoBoundsNeg = [self.scan_accdistance + self.gate_low, self.gate_high]
pso_bounds_pos = [self.scan_accdistance, self.gate_high]
pso_bounds_neg = [self.scan_accdistance + self.gate_low, self.gate_high]
self.filesubs['psoBoundsPos'] = self.psoBoundsPos
self.filesubs['psoBoundsNeg'] = self.psoBoundsNeg
self.filesubs['psoBoundsPosSize'] = len(self.psoBoundsPos)
self.filesubs['psoBoundsNegSize'] = len(self.psoBoundsNeg)
self.filesubs["psoBoundsPos"] = pso_bounds_pos
self.filesubs["psoBoundsNeg"] = pso_bounds_neg
self.filesubs["psoBoundsPosSize"] = len(pso_bounds_pos)
self.filesubs["psoBoundsNegSize"] = len(pso_bounds_neg)
# Call super() to do the substitutions
yield from super().prepare_positions()
@@ -543,8 +602,3 @@ class SequenceScanBase(TemplatedScanBase):
# """ Wait for DAQ before unstaging"""
# time.sleep(1)
# yield from super().unstage()

View File

@@ -1,21 +1,31 @@
""" Demo scans for Tomcat at the microXAS test bench
"""
def bl_check_beam():
""" Checks beamline status"""
"""Checks beamline status"""
motor_enabled = bool(dev.es1_roty.motor_enable.get())
result = motor_enabled
return result
def demostepscan(scan_start, scan_end, steps, exp_time=0.005, exp_burst=1, settling_time=0, roix=2016, roiy=2016, sync='event'):
""" Demo step scan with GigaFrost
def demostepscan(
scan_start,
scan_end,
steps,
exp_time=0.005,
exp_burst=1,
settling_time=0,
roix=2016,
roiy=2016,
sync="event",
):
"""Demo step scan with GigaFrost
This is a small BEC user-space demo step scan at the microXAS testbench
using the gigafrost in software triggering mode. It tries to be a
standard BEC scan, while still setting up the environment.
Example:
--------
demostepscan(scan_start=-32, scan_end=148, steps=180, exp_time=0.005, exp_burst=5)
@@ -26,15 +36,19 @@ def demostepscan(scan_start, scan_end, steps, exp_time=0.005, exp_burst=1, settl
scan_ntotal = exp_burst * (steps + 1)
# Move to start position
t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4}
cfg = {'ntotal': scan_ntotal*10, 'trigger': t_modes[sync]}
dev.es1_ddaq.configure(d=cfg)
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
cfg = {"ntotal": scan_ntotal * 10, "trigger": t_modes[sync]}
dev.es1_ddaq.configure(d=cfg)
# Configure gigafrost
cfg = {
"ntotal": scan_ntotal, "nimages": exp_burst,
"exposure": 1000*exp_time, "period": 2000*exp_time,
"pixel_width": roix, "pixel_height": roiy, "trigger_mode": "soft"
}
"ntotal": scan_ntotal,
"nimages": exp_burst,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"pixel_width": roix,
"pixel_height": roiy,
"trigger_mode": "soft",
}
dev.gfclient.configure(d=cfg)
# Configure PSO trigger (trigger is normally disabled in fly mode)
# dev.es1_psod.configure(d={})
@@ -45,24 +59,39 @@ def demostepscan(scan_start, scan_end, steps, exp_time=0.005, exp_burst=1, settl
# dev.es1_roty.move(scan_start).wait()
print("Handing over to 'scans.line_scan'")
wait_time = 2*exp_time * exp_burst
wait_time = 2 * exp_time * exp_burst
scans.line_scan(
dev.es1_roty, scan_start, scan_end,
steps=steps, exp_time=wait_time, relative=False, settling_time=settling_time)
dev.es1_roty,
scan_start,
scan_end,
steps=steps,
exp_time=wait_time,
relative=False,
settling_time=settling_time,
)
# Cleanup: disable SW trigger
dev.es1_psod.software_trigger = False
def demosequencescan(scan_start, gate_high, gate_low, repeats=1, repmode="PosNeg",
exp_time=0.005, exp_frames=180, roix=2016, roiy=2016, sync='pso'):
""" Demo sequence scan with GigaFrost
def demosequencescan(
scan_start,
gate_high,
gate_low,
repeats=1,
repmode="PosNeg",
exp_time=0.005,
exp_frames=180,
roix=2016,
roiy=2016,
sync="pso",
):
"""Demo sequence scan with GigaFrost
This is a small BEC user-space sequence scan at the microXAS testbench
triggering a customized scripted scan on the controller. The scan uses
a pre-written custom low-level sequence scan, so it is really minimal.
NOTE: It calls the AeroScript template version.
Example:
@@ -73,12 +102,33 @@ def demosequencescan(scan_start, gate_high, gate_low, repeats=1, repmode="PosNeg
raise RuntimeError("Beamline is not in ready state")
print("Handing over to 'scans.sequencescan'")
scans.sequencescan(scan_start, gate_high, gate_low, exp_time=exp_time, exp_frames=exp_frames, repeats=repeats, roix=roix, roiy=roiy, repmode=repmode, sync=sync)
scans.sequencescan(
scan_start,
gate_high,
gate_low,
exp_time=exp_time,
exp_frames=exp_frames,
repeats=repeats,
roix=roix,
roiy=roiy,
repmode=repmode,
sync=sync,
)
def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg",
exp_time=0.005, exp_frames=180, roix=2016, roiy=2016, sync='pso'):
""" Demo sequence scan with GigaFrost via BEC
def becsequencescan(
start,
gate_high,
gate_low,
repeats=1,
repmode="PosNeg",
exp_time=0.005,
exp_frames=180,
roix=2016,
roiy=2016,
sync="pso",
):
"""Demo sequence scan with GigaFrost via BEC
This is a small BEC user-space sequence scan at the microXAS testbench
doing every scan logic from BEC, i.e. there's no AeroScript involved.
@@ -91,7 +141,7 @@ def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg",
raise RuntimeError("Beamline is not in ready state")
# Parameter validation before scan
if exp_frames <=0:
if exp_frames <= 0:
raise RuntimeError(f"Requires at least one frame per cycle, got {self.exp_frames}")
# Synthetic values
@@ -102,43 +152,43 @@ def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg",
scan_velocity = gate_high / (exp_time * exp_frames)
scan_acceleration = 500
scan_safedistance = 10
scan_accdistance = scan_safedistance + 0.5* scan_velocity * scan_velocity / scan_acceleration
scan_accdistance = scan_safedistance + 0.5 * scan_velocity * scan_velocity / scan_acceleration
scan_ntotal = exp_frames * repeats
if scan_repmode in ("POS", "NEG"):
scan_range = repeats *(gate_high + gate_low)
if scan_repmode=="POS":
scan_range = repeats * (gate_high + gate_low)
if scan_repmode == "POS":
scan_start = start - scan_accdistance
scan_end = start + scan_range + scan_accdistance
if scan_repmode=="NEG":
if scan_repmode == "NEG":
scan_start = start + scan_accdistance
scan_end = start - scan_range - scan_accdistance
elif scan_repmode in ("POSNEG", "NEGPOS"):
scan_range = gate_high + gate_low
if scan_repmode=="POSNEG":
if scan_repmode == "POSNEG":
scan_start = start - scan_accdistance
scan_end = start + scan_range + scan_accdistance
if scan_repmode=="NEGPOS":
if scan_repmode == "NEGPOS":
scan_start = start + scan_accdistance
scan_end = start - scan_range - scan_accdistance
else:
raise RuntimeError(f"Unsupported repetition mode: {repmode}")
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4}
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
daq_trigger = t_modes[sync]
# Drice data collection config
daqcfg = {'ntotal': scan_ntotal, 'trigger': daq_trigger}
daqcfg = {"ntotal": scan_ntotal, "trigger": daq_trigger}
# Gigafrost config
camcfg = {
"ntotal": scan_ntotal,
"nimages": exp_frames,
"exposure": 1000 * exp_time,
"period": 2000*exp_time,
"pixel_width": scan_roix,
"pixel_height": scan_roiy
}
"ntotal": scan_ntotal,
"nimages": exp_frames,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"pixel_width": scan_roix,
"pixel_height": scan_roiy,
}
# PSO config
# Fill PSO vectors according to scan direction
@@ -176,16 +226,16 @@ def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg",
print("Manual motor scan")
if scan_repmode in ["POS", "NEG"]:
dev.es1_psod.prepare(psoBoundsPos)
dev.es1_roty.move(scan_end).wait()
dev.es1_psod.prepare(psoBoundsPos)
dev.es1_roty.move(scan_end).wait()
elif scan_repmode in ["POSNEG", "NEGPOS"]:
for ii in range(scan_repnum):
if ii%2==0:
if ii % 2 == 0:
dev.es1_psod.prepare(psoBoundsPos)
# FIXME : Temporary trigger
dev.gfclient.trigger()
dev.es1_roty.move(scan_end).wait()
if ii%2==1:
if ii % 2 == 1:
dev.es1_psod.prepare(psoBoundsNeg)
# FIXME : Temporary trigger
dev.gfclient.trigger()