Sequence scan also running

This commit is contained in:
gac-x05la
2024-11-20 17:31:16 +01:00
committed by mohacsi_i
parent 6d5b61eb87
commit f385e5c441
6 changed files with 55 additions and 655 deletions

View File

@@ -25,7 +25,7 @@ class AerotechTasksMixin(CustomDeviceMixin):
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scanparam = self.parent.scaninfo.scan_msg.info
@@ -35,7 +35,7 @@ class AerotechTasksMixin(CustomDeviceMixin):
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
if self.parent.scaninfo == "script":
if self.parent.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs:
d["script_text"] = scanargs["script_text"]
@@ -43,7 +43,7 @@ class AerotechTasksMixin(CustomDeviceMixin):
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs:
d["script_task"] = scanargs["script_task"]
if self.parent.scaninfo == "subs":
if self.parent.scaninfo.scan_type == "subs":
# NOTE: But if we ask for substitutions, we need the filename
filename = scanargs["script_template"]
filesubs = scanargs

View File

@@ -498,9 +498,6 @@ class GigaFrostCamera(PSIDetectorBase):
scanid : int, optional
Scan identification number to be associated with the scan data
(default = 0)
trigger_mode : str, optional
Trigger mode of the gifafrost
(default = unchanged)
correction_mode : int, optional
The correction to be applied to the imaging data. The following
modes are available (default = 5):
@@ -511,6 +508,8 @@ class GigaFrostCamera(PSIDetectorBase):
* 3: Send correction factor C instead of pixel values
* 4: Invert pixel values, but do not apply any linearity correction
* 5: Apply the full linearity correction
acq_mode : str, optional
Select one of the pre-configured trigger behavior
"""
# Stop acquisition
self.unstage()
@@ -519,29 +518,31 @@ class GigaFrostCamera(PSIDetectorBase):
# If Bluesky style configure
if d is not None:
num_images = d.get('exposure_num_burst', 10)
exposure_time_ms = d.get('exposure_time_ms', 0.2)
exposure_period_ms = d.get('exposure_period_ms', 2*exposure_time_ms)
image_width = d.get('image_width', 2016)
image_height = d.get('image_height', 2016)
# Commonly changed settings
if 'exposure_num_burst' in d:
self.cfgCntNum.set(d['exposure_num_burst']).wait()
if 'exposure_time_ms' in d:
self.cfgExposure.set(d['exposure_time_ms']).wait()
if 'exposure_period_ms' in d:
self.cfgFramerate.set(d['exposure_period_ms']).wait()
if 'image_width' in d:
if d['image_width']%48 !=0:
raise RuntimeError(f"[{self.name}] image_width must be divisible by 48")
self.cfgRoiX.set(d['image_width']).wait()
if 'image_height' in d:
if d['image_height']%16 !=0:
raise RuntimeError(f"[{self.name}] image_height must be divisible by 16")
self.cfgRoiY.set(d['image_height']).wait()
# Dont change theese
scanid = d.get('scanid', 0)
correction_mode = d.get('correction_mode', 5)
# change settings
self.cfgExposure.set(exposure_time_ms).wait()
self.cfgFramerate.set(exposure_period_ms).wait()
self.cfgRoiX.set(image_width).wait()
self.cfgRoiY.set(image_height).wait()
self.cfgScanId.set(scanid).wait()
self.cfgCntNum.set(num_images).wait()
self.cfgCorrMode.set(correction_mode).wait()
if 'acq_mode' in d:
self.set_acquisition_mode(d['acq_mode'])
# Commit parameter
# Commit parameters
self.cmdSetParam.set(1).wait()
def set_acquisition_mode(self, acq_mode):

View File

@@ -1,3 +1,2 @@
from .tutorial_fly_scan import AcquireDark, AcquireFlat, TutorialFlyScanContLine
from .tomcat_scanbase import SnapAndStepScanBase, GigaStepScanBase, SequenceScanBase, SimpleFlyer
from .tomcat_scans import TomcatStepScan, TomcatSnapNStep, TomcatSimpleSequence

View File

@@ -1,604 +0,0 @@
# -*- coding: utf-8 -*-
""" Tomcat scan base class examples
A collection of example scan base classes using Automation1 rotation stage,
GigaFrost camera and the StandardDAQ pipeline.
Created on Mon Sep 16 16:45:11 2024
@author: mohacsi_i
"""
import jinja2
import os
import time
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanBase
logger = bec_logger.logger
class TemplatedScanBase(AsyncFlyScanBase):
"""Base class for templated scans
Low-level base class for templated AeroScript scans at the Tomcat beamlines.
It sets the order of operations between aerotech, gigafrost and the standard
DAQ. But as a base class, it leaves ample freedom for individual
hardware configurations and scan implementations.
Example
-------
>>> scans.aeroscript_scan_base(filename="AerotechSnapAndStepTemplate.ascript", subs={'startpos': 42, 'stepsize': 0.1, 'numsteps': 1800, 'exptime': 0.1})
Parameters
----------
filename: str
Filename of the Aeroscript template file. This or filetext ismandatory.
scripttext: str
Raw AeroScript file text of the program. This or filename is mandatory.
subs: dict
Substitutions to the AeroScript template file.
taskindex: int
Task index tor the Aeroscript program execution. (default = 4)
camera: str
Device name of the used camera. (default = gfclient)
camcfg: str
Camera configuration. (default = {})
preview: str
Device name of the live stream preview. (default = daq_stream0)
daqname: str
Device name for position recording. (default = None)
daqmode: str
Operation mode for the position recording. (default = collect)
"""
scan_name = "aeroscript_scan_base"
scan_report_hint = "table"
required_kwargs = ["filename", "subs"]
def __init__(
self,
*,
filename=None,
filetext=None,
filesubs={},
controller="es1_tasks",
taskindex=4,
camera="gfclient",
camcfg=None,
drvdaq="es1_ddaq",
daqcfg=None,
daqmode="collect",
preview="daq_stream1",
**kwargs,
):
super().__init__(**kwargs)
self.axis = []
self.num_pos = 0
self.filename = filename
self.filetext = filetext
self.filesubs = filesubs
self.controller = controller
self.taskindex = taskindex
self.camera = camera
self.camcfg = camcfg
self.preview = preview
self.daqname = drvdaq
self.daqcfg = daqcfg
self.daqmode = daqmode
if self.filename is None and self.filetext is None:
raise RuntimeError("Must provide either filename or text to scan")
def prepare_positions(self):
"""Prepare action: render AeroScript file"""
# Load the test file
if self.filename is not None:
filename = os.path.join(
os.path.dirname(__file__), "../devices/aerotech/" + self.filename
)
logger.info(f"Attempting to load file {filename}")
with open(filename) as f:
templatetext = f.read()
# Substitute jinja template
tm = jinja2.Template(templatetext)
self.filetext = tm.render(scan=self.filesubs)
yield from super().prepare_positions()
def stage(self):
"""Configure and stage all devices"""
print("TOMCAT Staging scripted scan (via Jinjad AeroScript)")
# Configure the Aerotech by copying text to controller file and compiling it
taskcfg = {"text": self.filetext, "filename": "bec.ascript", "taskIndex": self.taskindex}
yield from self.stubs.send_rpc_and_wait(self.controller, "configure", taskcfg)
# Configure the camera (usually together wit the DAQ)
if self.camera is not None:
yield from self.stubs.send_rpc_and_wait(self.camera, "configure", self.camcfg)
# Configure the drive data collection
if self.daqname is not None:
yield from self.stubs.send_rpc_and_wait(self.daqname, "configure", self.daqcfg)
# ###################################################################################
# Staging
yield from self.stubs.send_rpc_and_wait("es1_tasks", "stage")
if self.camera is not None:
yield from self.stubs.send_rpc_and_wait(self.camera, "stage")
# DAQ should be staged from script
# if self.daqname is not None:
# yield from self.stubs.send_rpc_and_wait(self.daqname, "stage")
if self.preview is not None:
yield from self.stubs.send_rpc_and_wait(self.preview, "stage")
# TODO : This should stage the entire beamline
# yield from super().stage()
def scan_core(self):
"""The actual scan routine"""
print("TOMCAT Running scripted scan (via Jinjad AeroScript)")
t_start = time.time()
# Kickoff
yield from self.stubs.send_rpc_and_wait(self.controller, "kickoff")
# yield from self.stubs.send_rpc_and_wait(self.daqname, "kickoff")
time.sleep(0.5)
# FIXME: Temporary triggering
st = yield from self.stubs.send_rpc_and_wait(self.camera, "trigger")
# Complete
# FIXME: this will swallow errors
# yield from self.stubs.complete(device="es1_tasks")
st = yield from self.stubs.send_rpc_and_wait(self.controller, "complete")
st.wait()
task_states = yield from self.stubs.send_rpc_and_wait(self.controller, "taskStates.get")
if task_states[self.taskindex] == 8:
raise RuntimeError(f"Task {self.taskindex} finished in ERROR state")
t_end = time.time()
t_elapsed = t_end - t_start
print(f"Elapsed scan time: {t_elapsed}")
time.sleep(0.5)
# Collect
# if self.daqname is not None and self.daqmode=="collect":
# print("TOMCAT Collecting scripted scan results (from EPICS)")
# positions = yield from self.stubs.send_rpc_and_wait(self.daqname, "collect")
# logger.info(f"Finished scan with collected positions: {positions}")
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
print("TOMCAT Officially finshed the scan")
return super().cleanup()
class SimpleFlyer(AsyncFlyScanBase):
"""Simple fly scan base class
Very simple flyer class (something like the bluesky flyer).
It expects every device to be configured before the scan.
Example
-------
>>> scans.simpleflyer(
sync_devices=['daq_stream1'],
flyer_devices=['es1_tasks', 'es1_psod', 'es1_ddaq'])
"""
scan_name = "simpleflyer"
scan_report_hint = "table"
required_kwargs = ["sync_devices", "flyer_devices"]
def __init__(self, sync_devices=[], async_devices=[], **kwargs):
# Auto-setup configuration parameters from input
self.sync_devices = sync_devices
self.async_devices = async_devices
# Call super()
super().__init__(**kwargs)
def stage(self):
for dev in self.sync_devices:
yield from self.stubs.send_rpc_and_wait(dev, "stage")
for dev in self.async_devices:
yield from self.stubs.send_rpc_and_wait(dev, "stage")
yield from super().stage()
def scan_core(self):
for dev in self.async_devices:
yield from self.stubs.send_rpc_and_wait(dev, "kickoff")
for dev in self.async_devices:
yield from self.stubs.send_rpc_and_wait(dev, "complete")
def unstage(self):
"""Wait for DAQ before unstaging"""
time.sleep(1)
yield from super().unstage()
class GigaStepScanBase(ScanBase):
"""Gigafrost software step scan base class
Example class for simple BEC-based step scans using the low-level API.
Until the final cabling is done, the scan can be tested in soft-trigger
mode.
Example
-------
>>> scans.gigastep(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5)
"""
scan_name = "gigastep"
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy"],
}
def _get_scan_motors(self):
self.scan_motors = ["es1_roty"]
def __init__(
self,
scan_start: float,
scan_end: float,
steps: int,
exp_time=0.005,
settling_time=0.2,
burst_at_each_point=1,
roix=2016,
roiy=2016,
sync="event",
**kwargs,
):
super().__init__(
exp_time=exp_time,
settling_time=settling_time,
relative=False,
burst_at_each_point=1,
optim_trajectory=None,
**kwargs,
)
self.motor = "es1_roty"
self.scan_start = scan_start
self.scan_end = scan_end
self.scan_steps = steps
self.scan_stepsize = (scan_end - scan_start) / steps
self.scan_ntotal = burst_at_each_point * (steps + 1)
self.scan_nburst = burst_at_each_point
self.scan_roix = roix
self.scan_roiy = roiy
# Override wait time with actual latency
# NOTE : The BEC does not wait for trigger() status done
self.exp_time = settling_time + exp_time * 2 * burst_at_each_point
# Prepare configuration dicts
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
daq_trigger = t_modes[sync]
self.psocfg = {}
self.daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger}
self.camcfg = {
"ntotal": self.scan_ntotal,
"nimages": burst_at_each_point,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"trigger_mode": "soft",
"pixel_width": self.scan_roix,
"pixel_height": self.scan_roiy,
}
def _calculate_positions(self) -> None:
"""Pre-calculate scan positions"""
for ii in range(self.scan_steps + 1):
self.positions.append(self.scan_start + ii * self.scan_stepsize)
def stage(self):
"""First do the actual configuration then the gutted staging"""
print(f"TOMCAT Staging step scan with motors {self.motor}")
# Configure the camera (usually together wit the DAQ)
yield from self.stubs.send_rpc_and_wait("gfclient", "configure", self.camcfg)
# Configure the position readback
yield from self.stubs.send_rpc_and_wait("es1_ddaq", "configure", self.daqcfg)
# Configure the manual triggers
yield from self.stubs.send_rpc_and_wait("es1_psod", "configure", self.psocfg)
# Explicitly arm trigger
yield from self.stubs.send_rpc_and_wait("es1_psod", "prepare")
# TODO: Explicitly arm detector
# Stage everything...
yield from self.stubs.send_rpc_and_wait("gfclient", "stage")
yield from self.stubs.send_rpc_and_wait("es1_ddaq", "stage")
yield from self.stubs.send_rpc_and_wait("es1_psod", "stage")
yield from self.stubs.send_rpc_and_wait("daq_stream1", "stage")
# FIXME: Shouldn't this stage() everything?
return super().stage()
def unstage(self):
"""Wait for DAQ before unstaging"""
time.sleep(1)
yield from super().unstage()
class SnapAndStepScanBase(TemplatedScanBase):
"""Snap'n Step scan base class
Example base class for AeroScript-based high speed step scans. This
function loads a pre-written AeroScript file on the Automation1
controller for maximum stepping speed.
NOTE: As a temporary setup until the final cabling is done, the scan
is in a soft-trigger mode. The Gigafrost is just software triggered,
and the position readback is wired to the internal trigger.
Example
-------
>>> scans.snapnstep(-20 160, steps=1800, exp_time=0.005, burst_at_each_point=5)
"""
scan_name = "snapnstep"
scan_report_hint = "table"
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["scan_start", "scan_end", "steps"],
"Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy", "sync"],
}
def __init__(
self,
scan_start,
scan_end,
steps,
exp_time=0.005,
settling_time=0,
burst_at_each_point=1,
roix=2016,
roiy=2016,
sync="event",
**kwargs,
):
# Auto-setup configuration parameters from input
self.scan_start = scan_start
self.scan_end = scan_end
self.scan_steps = steps
self.exp_time = exp_time
self.exp_burst = burst_at_each_point
self.settling_time = settling_time
# Synthetic values
self.scan_stepsize = (scan_end - scan_start) / steps
self.scan_ntotal = (steps + 1) * burst_at_each_point
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
p_modes = {
"pso": "PsoOutput",
"event": "PsoEvent",
"inp0": "HighSpeedInput0RisingEdge",
"inp1": "HighSpeedInput1RisingEdge",
}
daq_trigger = t_modes[sync]
pso_trigger = p_modes[sync]
filename = "AerotechSnapAndStepTemplate.ascript"
filesubs = {
"startpos": self.scan_start,
"stepsize": self.scan_stepsize,
"numsteps": self.scan_steps,
"exptime": 2 * self.exp_time * self.exp_burst,
"settling": self.settling_time,
"psotrigger": pso_trigger,
"npoints": self.scan_ntotal,
}
# DDC config
daqname = "es1_ddaq"
daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger}
# Gigafrost config
camname = "gfclient"
camcfg = {
"ntotal": self.scan_ntotal,
"nimages": burst_at_each_point,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"trigger_mode": "eternal",
"pixel_width": roix,
"pixel_height": roiy,
}
# Parameter validation before scan
if self.scan_steps <= 0:
raise RuntimeError(f"Requires at least one scan step, got {self.scan_steps}")
# Call super()
super().__init__(
filename=filename,
filesubs=filesubs,
controller="es1_tasks",
taskindex=4,
camera=camname,
camcfg=camcfg,
drvdaq=daqname,
daqcfg=daqcfg,
daqmode="collect",
preview="daq_stream1",
**kwargs,
)
def unstage(self):
"""Wait for DAQ before unstaging"""
time.sleep(1)
yield from super().unstage()
class SequenceScanBase(TemplatedScanBase):
"""Sequence scan base class
Example base class for AeroScript-based sequence scans. This function
loads a pre-written AeroScript file on the Automation1 controller for
low latency gated scans.
NOTE: As a temporary setup until the final cabling is done, the scan
is in a soft-trigger mode. The Gigafrost is just software triggered,
and the position readback is wired to the internal trigger.
Example
-------
>>> scans.sequencescan(33, 180, 180, exp_time=0.005, exp_frames=1800, repeats=10)
"""
scan_name = "sequencescan"
scan_report_hint = "table"
required_kwargs = ["scan_start", "gate_high", "gate_low"]
gui_config = {
"Movement parameters": ["scan_start", "repeats", "repmode"],
"Acquisition parameters": [
"gate_high",
"gate_low",
"exp_time",
"exp_frames",
"roix",
"roiy",
"sync",
],
}
def __init__(
self,
scan_start,
gate_high,
gate_low,
repeats=1,
repmode="PosNeg",
exp_time=0.005,
exp_frames=180,
roix=2016,
roiy=2016,
sync="pso",
**kwargs,
):
# Auto-setup configuration parameters from input
self.scan_start = scan_start
self.gate_high = gate_high
self.gate_low = gate_low
self.scan_repnum = repeats
self.scan_repmode = repmode.upper()
self.exp_time = exp_time
self.exp_frames = exp_frames
# Synthetic values
self.scan_velocity = gate_high / (exp_time * exp_frames)
self.scan_acceleration = 500
self.scan_safedistance = 10
self.scan_accdistance = (
self.scan_safedistance
+ 0.5 * self.scan_velocity * self.scan_velocity / self.scan_acceleration
)
self.scan_ntotal = exp_frames * repeats
if repmode.upper() in ("POS", "NEG"):
self.scan_range = repeats * (gate_high + gate_low)
elif repmode.upper() in ("POSNEG", "NEGPOS"):
self.scan_range = gate_high + gate_low
else:
raise RuntimeError(f"Unsupported repetition mode: {repmode}")
# Aerotech DDC settings: Internal event trigger: PsoEvent = 1
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
p_modes = {
"pso": "PsoOutput",
"event": "PsoEvent",
"inp0": "HighSpeedInput0RisingEdge",
"inp1": "HighSpeedInput1RisingEdge",
}
daq_trigger = t_modes[sync]
pso_trigger = p_modes[sync]
# AeroScript substitutions
filename = "AerotechSimpleSequenceTemplate.ascript"
filesubs = {
"startpos": self.scan_start,
"scanrange": self.scan_range,
"nrepeat": self.scan_repnum,
"scanvel": self.scan_velocity,
"scanacc": self.scan_acceleration,
"accdist": self.scan_accdistance,
"npoints": self.scan_ntotal,
"scandir": self.scan_repmode.upper(),
"psotrigger": pso_trigger,
}
# Drice data collection config
# FIXME: The data sources need to be set to same as the script
daqname = "es1_ddaq"
daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger}
# Gigafrost config
camname = "gfclient"
camcfg = {
"ntotal": self.scan_ntotal,
"nimages": self.exp_frames,
"exposure": 1000 * exp_time,
"period": 2000 * exp_time,
"pixel_width": roix,
"pixel_height": roiy,
}
# Parameter validation before scan
if self.exp_frames <= 0:
raise RuntimeError(f"Requires at least one frame per cycle, got {self.exp_frames}")
# Call super()
super().__init__(
filename=filename,
filesubs=filesubs,
controller="es1_tasks",
taskindex=4,
camera=camname,
camcfg=camcfg,
drvdaq=daqname,
daqcfg=daqcfg,
daqmode="collect",
preview="daq_stream1",
**kwargs,
)
def prepare_positions(self):
# Fill PSO vectors according to scan direction
# NOTE: Distance counter is bidirectional
pso_bounds_pos = []
pso_bounds_neg = []
if self.scan_repmode in ("pos", "Pos", "POS", "neg", "Neg", "NEG"):
pso_bounds_pos.append(self.scan_accdistance)
for ii in range(self.scan_repnum):
pso_bounds_pos += [self.gate_high, self.gate_low]
pso_bounds_pos = pso_bounds_pos[:-1]
if self.scan_repmode in ("posneg", "PosNeg", "POSNEG", "negpos", "NegPos", "NEGPOS"):
pso_bounds_pos = [self.scan_accdistance, self.gate_high]
pso_bounds_neg = [self.scan_accdistance + self.gate_low, self.gate_high]
self.filesubs["psoBoundsPos"] = pso_bounds_pos
self.filesubs["psoBoundsNeg"] = pso_bounds_neg
self.filesubs["psoBoundsPosSize"] = len(pso_bounds_pos)
self.filesubs["psoBoundsNegSize"] = len(pso_bounds_neg)
# Call super() to do the substitutions
yield from super().prepare_positions()
# def unstage(self):
# """ Wait for DAQ before unstaging"""
# time.sleep(1)
# yield from super().unstage()

View File

@@ -24,15 +24,28 @@ logger = bec_logger.logger
class TomcatStepScan(ScanBase):
"""Simple software step scan forTomcat
"""Simple software step scan for Tomcat
Example class for simple BEC-based step scan using the low-level API.
All it does is translate conventional kwargs to tomcat specific device
pareameters and launches the standard step scan.
Example class for simple BEC-based step scan using the low-level API. It's just a standard
'line_scan' with the only difference that overrides burst behavior to use camera burst instead
of software triggers.
Example
-------
NOTE: As decided by Tomcat, the scans should not manage the scope of devices
- All enabled devices are expected to be configured for acquisition by the end of stage
- Some devices can configure themselves from mandatory scan parameters (steps, burst)
- Other devices can be optionally configured by keyword arguments
- Devices will try to stage using whatever was set on them before
Example:
--------
>>> scans.tomcatstepscan(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5)
Common keyword arguments:
-------------------------
image_width : int
image_height : int
ddc_trigger : str
"""
scan_name = "tomcatstepscan"
@@ -40,7 +53,7 @@ class TomcatStepScan(ScanBase):
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters": ["exp_time", "exp_burst", "image_width", "image_height"],
"Acquisition parameters": ["exp_time", "exp_burst"],
}
def update_scan_motors(self):
@@ -54,18 +67,12 @@ class TomcatStepScan(ScanBase):
scan_start: float,
scan_end: float,
steps: int,
exp_time=0.005,
settling_time=0.2,
exp_burst=1,
image_width=2016,
image_height=2016,
sync="event",
exp_time: float=0.005,
settling_time: float=0.2,
exp_burst: int=1,
**kwargs,
):
# Converting generic kwargs to tomcat device configuration parameters
# Use PSO trigger
kwargs["parameter"]["kwargs"]["pso_wavemode"] = "pulsed"
super().__init__(
exp_time=exp_time,
settling_time=settling_time,
@@ -88,22 +95,17 @@ class TomcatStepScan(ScanBase):
def _at_each_point(self, ind=None, pos=None):
""" Overriden at_each_point, using detector burst instaead of manual triggering"""
trigger_time = self.exp_time * self.burst_at_each_point
yield from self._move_scan_motors_and_wait(pos)
time.sleep(self.settling_time)
# yield from self.stubs.trigger(min_wait=trigger_time)
yield from self.stubs.trigger(group='trigger', point_id=self.point_id)
trigger_time = self.exp_time * self.burst_at_each_point
time.sleep(trigger_time)
time.sleep(self.settling_time)
yield from self.stubs.read(group="monitored", point_id=self.point_id, wait_group=None)
# self.point_id += 1
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
return super().cleanup()
self.point_id += 1
class TomcatSnapNStep(AsyncFlyScanBase):

View File

@@ -28,7 +28,7 @@ def anotherstepscan(
settling_time=0,
image_width=2016,
image_height=2016,
sync="event",
sync="inp1",
):
"""Demo step scan with GigaFrost
@@ -65,6 +65,9 @@ def anotherstepscan(
sync=sync
)
def anothersequencescan(
scan_start,
gate_high,
@@ -100,9 +103,8 @@ def anothersequencescan(
dev.daq_stream0.enabled = True
dev.daq_stream1.enabled = False
print("Handing over to 'scans.sequencescan'")
scans.sequencescan(
scans.tomcatsimplesequencescan(
scan_start,
gate_high,
gate_low,