diff --git a/tomcat_bec/devices/aerotech/AerotechTasks.py b/tomcat_bec/devices/aerotech/AerotechTasks.py index 237143f..fd506f4 100644 --- a/tomcat_bec/devices/aerotech/AerotechTasks.py +++ b/tomcat_bec/devices/aerotech/AerotechTasks.py @@ -25,7 +25,7 @@ class AerotechTasksMixin(CustomDeviceMixin): when not in use. I.e. this method is not expected to be called when PSO is not needed or when it'd conflict with other devices. """ - # logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys()) + logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys()) # Fish out our configuration from scaninfo (via explicit or generic addressing) scanparam = self.parent.scaninfo.scan_msg.info @@ -35,7 +35,7 @@ class AerotechTasksMixin(CustomDeviceMixin): if "kwargs" in scanparam: scanargs = scanparam["kwargs"] - if self.parent.scaninfo == "script": + if self.parent.scaninfo.scan_type in ("script", "scripted"): # NOTE: Scans don't have to fully configure the device if "script_text" in scanargs: d["script_text"] = scanargs["script_text"] @@ -43,7 +43,7 @@ class AerotechTasksMixin(CustomDeviceMixin): d["script_file"] = scanargs["script_file"] if "script_task" in scanargs: d["script_task"] = scanargs["script_task"] - if self.parent.scaninfo == "subs": + if self.parent.scaninfo.scan_type == "subs": # NOTE: But if we ask for substitutions, we need the filename filename = scanargs["script_template"] filesubs = scanargs diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py index 93c3ac9..7f65e01 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostcamera.py +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -498,9 +498,6 @@ class GigaFrostCamera(PSIDetectorBase): scanid : int, optional Scan identification number to be associated with the scan data (default = 0) - trigger_mode : str, optional - Trigger mode of the gifafrost - (default = unchanged) correction_mode : int, optional The correction to be applied to the imaging data. The following modes are available (default = 5): @@ -511,6 +508,8 @@ class GigaFrostCamera(PSIDetectorBase): * 3: Send correction factor C instead of pixel values * 4: Invert pixel values, but do not apply any linearity correction * 5: Apply the full linearity correction + acq_mode : str, optional + Select one of the pre-configured trigger behavior """ # Stop acquisition self.unstage() @@ -519,29 +518,31 @@ class GigaFrostCamera(PSIDetectorBase): # If Bluesky style configure if d is not None: - num_images = d.get('exposure_num_burst', 10) - exposure_time_ms = d.get('exposure_time_ms', 0.2) - exposure_period_ms = d.get('exposure_period_ms', 2*exposure_time_ms) - image_width = d.get('image_width', 2016) - image_height = d.get('image_height', 2016) + # Commonly changed settings + if 'exposure_num_burst' in d: + self.cfgCntNum.set(d['exposure_num_burst']).wait() + if 'exposure_time_ms' in d: + self.cfgExposure.set(d['exposure_time_ms']).wait() + if 'exposure_period_ms' in d: + self.cfgFramerate.set(d['exposure_period_ms']).wait() + if 'image_width' in d: + if d['image_width']%48 !=0: + raise RuntimeError(f"[{self.name}] image_width must be divisible by 48") + self.cfgRoiX.set(d['image_width']).wait() + if 'image_height' in d: + if d['image_height']%16 !=0: + raise RuntimeError(f"[{self.name}] image_height must be divisible by 16") + self.cfgRoiY.set(d['image_height']).wait() + # Dont change theese scanid = d.get('scanid', 0) correction_mode = d.get('correction_mode', 5) - - # change settings - self.cfgExposure.set(exposure_time_ms).wait() - self.cfgFramerate.set(exposure_period_ms).wait() - self.cfgRoiX.set(image_width).wait() - self.cfgRoiY.set(image_height).wait() self.cfgScanId.set(scanid).wait() - self.cfgCntNum.set(num_images).wait() self.cfgCorrMode.set(correction_mode).wait() if 'acq_mode' in d: - - self.set_acquisition_mode(d['acq_mode']) - # Commit parameter + # Commit parameters self.cmdSetParam.set(1).wait() def set_acquisition_mode(self, acq_mode): diff --git a/tomcat_bec/scans/__init__.py b/tomcat_bec/scans/__init__.py index c526363..2eb3866 100644 --- a/tomcat_bec/scans/__init__.py +++ b/tomcat_bec/scans/__init__.py @@ -1,3 +1,2 @@ from .tutorial_fly_scan import AcquireDark, AcquireFlat, TutorialFlyScanContLine -from .tomcat_scanbase import SnapAndStepScanBase, GigaStepScanBase, SequenceScanBase, SimpleFlyer from .tomcat_scans import TomcatStepScan, TomcatSnapNStep, TomcatSimpleSequence diff --git a/tomcat_bec/scans/tomcat_scanbase.py b/tomcat_bec/scans/tomcat_scanbase.py deleted file mode 100644 index cfe10e4..0000000 --- a/tomcat_bec/scans/tomcat_scanbase.py +++ /dev/null @@ -1,604 +0,0 @@ -# -*- coding: utf-8 -*- -""" Tomcat scan base class examples - -A collection of example scan base classes using Automation1 rotation stage, -GigaFrost camera and the StandardDAQ pipeline. - -Created on Mon Sep 16 16:45:11 2024 - -@author: mohacsi_i -""" -import jinja2 -import os -import time - -from bec_lib import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase, ScanBase - -logger = bec_logger.logger - - -class TemplatedScanBase(AsyncFlyScanBase): - """Base class for templated scans - - Low-level base class for templated AeroScript scans at the Tomcat beamlines. - It sets the order of operations between aerotech, gigafrost and the standard - DAQ. But as a base class, it leaves ample freedom for individual - hardware configurations and scan implementations. - - Example - ------- - >>> scans.aeroscript_scan_base(filename="AerotechSnapAndStepTemplate.ascript", subs={'startpos': 42, 'stepsize': 0.1, 'numsteps': 1800, 'exptime': 0.1}) - - Parameters - ---------- - filename: str - Filename of the Aeroscript template file. This or filetext ismandatory. - scripttext: str - Raw AeroScript file text of the program. This or filename is mandatory. - subs: dict - Substitutions to the AeroScript template file. - taskindex: int - Task index tor the Aeroscript program execution. (default = 4) - camera: str - Device name of the used camera. (default = gfclient) - camcfg: str - Camera configuration. (default = {}) - preview: str - Device name of the live stream preview. (default = daq_stream0) - daqname: str - Device name for position recording. (default = None) - daqmode: str - Operation mode for the position recording. (default = collect) - """ - - scan_name = "aeroscript_scan_base" - scan_report_hint = "table" - required_kwargs = ["filename", "subs"] - - def __init__( - self, - *, - filename=None, - filetext=None, - filesubs={}, - controller="es1_tasks", - taskindex=4, - camera="gfclient", - camcfg=None, - drvdaq="es1_ddaq", - daqcfg=None, - daqmode="collect", - preview="daq_stream1", - **kwargs, - ): - super().__init__(**kwargs) - self.axis = [] - self.num_pos = 0 - - self.filename = filename - self.filetext = filetext - self.filesubs = filesubs - self.controller = controller - self.taskindex = taskindex - self.camera = camera - self.camcfg = camcfg - self.preview = preview - self.daqname = drvdaq - self.daqcfg = daqcfg - self.daqmode = daqmode - - if self.filename is None and self.filetext is None: - raise RuntimeError("Must provide either filename or text to scan") - - def prepare_positions(self): - """Prepare action: render AeroScript file""" - # Load the test file - if self.filename is not None: - filename = os.path.join( - os.path.dirname(__file__), "../devices/aerotech/" + self.filename - ) - logger.info(f"Attempting to load file {filename}") - with open(filename) as f: - templatetext = f.read() - - # Substitute jinja template - tm = jinja2.Template(templatetext) - self.filetext = tm.render(scan=self.filesubs) - - yield from super().prepare_positions() - - def stage(self): - """Configure and stage all devices""" - - print("TOMCAT Staging scripted scan (via Jinjad AeroScript)") - - # Configure the Aerotech by copying text to controller file and compiling it - taskcfg = {"text": self.filetext, "filename": "bec.ascript", "taskIndex": self.taskindex} - yield from self.stubs.send_rpc_and_wait(self.controller, "configure", taskcfg) - - # Configure the camera (usually together wit the DAQ) - if self.camera is not None: - yield from self.stubs.send_rpc_and_wait(self.camera, "configure", self.camcfg) - # Configure the drive data collection - if self.daqname is not None: - yield from self.stubs.send_rpc_and_wait(self.daqname, "configure", self.daqcfg) - - # ################################################################################### - # Staging - yield from self.stubs.send_rpc_and_wait("es1_tasks", "stage") - if self.camera is not None: - yield from self.stubs.send_rpc_and_wait(self.camera, "stage") - # DAQ should be staged from script - # if self.daqname is not None: - # yield from self.stubs.send_rpc_and_wait(self.daqname, "stage") - if self.preview is not None: - yield from self.stubs.send_rpc_and_wait(self.preview, "stage") - - # TODO : This should stage the entire beamline - # yield from super().stage() - - def scan_core(self): - """The actual scan routine""" - print("TOMCAT Running scripted scan (via Jinjad AeroScript)") - t_start = time.time() - - # Kickoff - yield from self.stubs.send_rpc_and_wait(self.controller, "kickoff") - # yield from self.stubs.send_rpc_and_wait(self.daqname, "kickoff") - time.sleep(0.5) - - # FIXME: Temporary triggering - st = yield from self.stubs.send_rpc_and_wait(self.camera, "trigger") - - # Complete - # FIXME: this will swallow errors - # yield from self.stubs.complete(device="es1_tasks") - st = yield from self.stubs.send_rpc_and_wait(self.controller, "complete") - st.wait() - task_states = yield from self.stubs.send_rpc_and_wait(self.controller, "taskStates.get") - if task_states[self.taskindex] == 8: - raise RuntimeError(f"Task {self.taskindex} finished in ERROR state") - - t_end = time.time() - t_elapsed = t_end - t_start - print(f"Elapsed scan time: {t_elapsed}") - time.sleep(0.5) - - # Collect - # if self.daqname is not None and self.daqmode=="collect": - # print("TOMCAT Collecting scripted scan results (from EPICS)") - # positions = yield from self.stubs.send_rpc_and_wait(self.daqname, "collect") - # logger.info(f"Finished scan with collected positions: {positions}") - - def cleanup(self): - """Set scan progress to 1 to finish the scan""" - self.num_pos = 1 - print("TOMCAT Officially finshed the scan") - return super().cleanup() - - -class SimpleFlyer(AsyncFlyScanBase): - """Simple fly scan base class - - Very simple flyer class (something like the bluesky flyer). - It expects every device to be configured before the scan. - - Example - ------- - >>> scans.simpleflyer( - sync_devices=['daq_stream1'], - flyer_devices=['es1_tasks', 'es1_psod', 'es1_ddaq']) - """ - - scan_name = "simpleflyer" - scan_report_hint = "table" - required_kwargs = ["sync_devices", "flyer_devices"] - - def __init__(self, sync_devices=[], async_devices=[], **kwargs): - # Auto-setup configuration parameters from input - self.sync_devices = sync_devices - self.async_devices = async_devices - - # Call super() - super().__init__(**kwargs) - - def stage(self): - - for dev in self.sync_devices: - yield from self.stubs.send_rpc_and_wait(dev, "stage") - for dev in self.async_devices: - yield from self.stubs.send_rpc_and_wait(dev, "stage") - - yield from super().stage() - - def scan_core(self): - - for dev in self.async_devices: - yield from self.stubs.send_rpc_and_wait(dev, "kickoff") - - for dev in self.async_devices: - yield from self.stubs.send_rpc_and_wait(dev, "complete") - - def unstage(self): - """Wait for DAQ before unstaging""" - time.sleep(1) - yield from super().unstage() - - -class GigaStepScanBase(ScanBase): - """Gigafrost software step scan base class - - Example class for simple BEC-based step scans using the low-level API. - Until the final cabling is done, the scan can be tested in soft-trigger - mode. - - Example - ------- - >>> scans.gigastep(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5) - """ - - scan_name = "gigastep" - required_kwargs = ["scan_start", "scan_end", "steps"] - gui_config = { - "Movement parameters": ["steps"], - "Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy"], - } - - def _get_scan_motors(self): - self.scan_motors = ["es1_roty"] - - def __init__( - self, - scan_start: float, - scan_end: float, - steps: int, - exp_time=0.005, - settling_time=0.2, - burst_at_each_point=1, - roix=2016, - roiy=2016, - sync="event", - **kwargs, - ): - super().__init__( - exp_time=exp_time, - settling_time=settling_time, - relative=False, - burst_at_each_point=1, - optim_trajectory=None, - **kwargs, - ) - - self.motor = "es1_roty" - self.scan_start = scan_start - self.scan_end = scan_end - self.scan_steps = steps - self.scan_stepsize = (scan_end - scan_start) / steps - self.scan_ntotal = burst_at_each_point * (steps + 1) - self.scan_nburst = burst_at_each_point - self.scan_roix = roix - self.scan_roiy = roiy - - # Override wait time with actual latency - # NOTE : The BEC does not wait for trigger() status done - self.exp_time = settling_time + exp_time * 2 * burst_at_each_point - - # Prepare configuration dicts - # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 - t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} - daq_trigger = t_modes[sync] - - self.psocfg = {} - self.daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger} - self.camcfg = { - "ntotal": self.scan_ntotal, - "nimages": burst_at_each_point, - "exposure": 1000 * exp_time, - "period": 2000 * exp_time, - "trigger_mode": "soft", - "pixel_width": self.scan_roix, - "pixel_height": self.scan_roiy, - } - - def _calculate_positions(self) -> None: - """Pre-calculate scan positions""" - for ii in range(self.scan_steps + 1): - self.positions.append(self.scan_start + ii * self.scan_stepsize) - - def stage(self): - """First do the actual configuration then the gutted staging""" - - print(f"TOMCAT Staging step scan with motors {self.motor}") - # Configure the camera (usually together wit the DAQ) - yield from self.stubs.send_rpc_and_wait("gfclient", "configure", self.camcfg) - # Configure the position readback - yield from self.stubs.send_rpc_and_wait("es1_ddaq", "configure", self.daqcfg) - # Configure the manual triggers - yield from self.stubs.send_rpc_and_wait("es1_psod", "configure", self.psocfg) - - # Explicitly arm trigger - yield from self.stubs.send_rpc_and_wait("es1_psod", "prepare") - # TODO: Explicitly arm detector - - # Stage everything... - yield from self.stubs.send_rpc_and_wait("gfclient", "stage") - yield from self.stubs.send_rpc_and_wait("es1_ddaq", "stage") - yield from self.stubs.send_rpc_and_wait("es1_psod", "stage") - yield from self.stubs.send_rpc_and_wait("daq_stream1", "stage") - - # FIXME: Shouldn't this stage() everything? - return super().stage() - - def unstage(self): - """Wait for DAQ before unstaging""" - time.sleep(1) - yield from super().unstage() - - -class SnapAndStepScanBase(TemplatedScanBase): - """Snap'n Step scan base class - - Example base class for AeroScript-based high speed step scans. This - function loads a pre-written AeroScript file on the Automation1 - controller for maximum stepping speed. - - NOTE: As a temporary setup until the final cabling is done, the scan - is in a soft-trigger mode. The Gigafrost is just software triggered, - and the position readback is wired to the internal trigger. - - Example - ------- - >>> scans.snapnstep(-20 160, steps=1800, exp_time=0.005, burst_at_each_point=5) - """ - - scan_name = "snapnstep" - scan_report_hint = "table" - required_kwargs = ["scan_start", "scan_end", "steps"] - gui_config = { - "Movement parameters": ["scan_start", "scan_end", "steps"], - "Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy", "sync"], - } - - def __init__( - self, - scan_start, - scan_end, - steps, - exp_time=0.005, - settling_time=0, - burst_at_each_point=1, - roix=2016, - roiy=2016, - sync="event", - **kwargs, - ): - # Auto-setup configuration parameters from input - self.scan_start = scan_start - self.scan_end = scan_end - self.scan_steps = steps - self.exp_time = exp_time - self.exp_burst = burst_at_each_point - self.settling_time = settling_time - - # Synthetic values - self.scan_stepsize = (scan_end - scan_start) / steps - self.scan_ntotal = (steps + 1) * burst_at_each_point - - # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 - t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} - p_modes = { - "pso": "PsoOutput", - "event": "PsoEvent", - "inp0": "HighSpeedInput0RisingEdge", - "inp1": "HighSpeedInput1RisingEdge", - } - daq_trigger = t_modes[sync] - pso_trigger = p_modes[sync] - - filename = "AerotechSnapAndStepTemplate.ascript" - filesubs = { - "startpos": self.scan_start, - "stepsize": self.scan_stepsize, - "numsteps": self.scan_steps, - "exptime": 2 * self.exp_time * self.exp_burst, - "settling": self.settling_time, - "psotrigger": pso_trigger, - "npoints": self.scan_ntotal, - } - # DDC config - daqname = "es1_ddaq" - daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger} - # Gigafrost config - camname = "gfclient" - camcfg = { - "ntotal": self.scan_ntotal, - "nimages": burst_at_each_point, - "exposure": 1000 * exp_time, - "period": 2000 * exp_time, - "trigger_mode": "eternal", - "pixel_width": roix, - "pixel_height": roiy, - } - - # Parameter validation before scan - if self.scan_steps <= 0: - raise RuntimeError(f"Requires at least one scan step, got {self.scan_steps}") - - # Call super() - super().__init__( - filename=filename, - filesubs=filesubs, - controller="es1_tasks", - taskindex=4, - camera=camname, - camcfg=camcfg, - drvdaq=daqname, - daqcfg=daqcfg, - daqmode="collect", - preview="daq_stream1", - **kwargs, - ) - - def unstage(self): - """Wait for DAQ before unstaging""" - time.sleep(1) - yield from super().unstage() - - -class SequenceScanBase(TemplatedScanBase): - """Sequence scan base class - - Example base class for AeroScript-based sequence scans. This function - loads a pre-written AeroScript file on the Automation1 controller for - low latency gated scans. - - NOTE: As a temporary setup until the final cabling is done, the scan - is in a soft-trigger mode. The Gigafrost is just software triggered, - and the position readback is wired to the internal trigger. - - Example - ------- - >>> scans.sequencescan(33, 180, 180, exp_time=0.005, exp_frames=1800, repeats=10) - """ - - scan_name = "sequencescan" - scan_report_hint = "table" - required_kwargs = ["scan_start", "gate_high", "gate_low"] - gui_config = { - "Movement parameters": ["scan_start", "repeats", "repmode"], - "Acquisition parameters": [ - "gate_high", - "gate_low", - "exp_time", - "exp_frames", - "roix", - "roiy", - "sync", - ], - } - - def __init__( - self, - scan_start, - gate_high, - gate_low, - repeats=1, - repmode="PosNeg", - exp_time=0.005, - exp_frames=180, - roix=2016, - roiy=2016, - sync="pso", - **kwargs, - ): - # Auto-setup configuration parameters from input - self.scan_start = scan_start - self.gate_high = gate_high - self.gate_low = gate_low - self.scan_repnum = repeats - self.scan_repmode = repmode.upper() - self.exp_time = exp_time - self.exp_frames = exp_frames - - # Synthetic values - self.scan_velocity = gate_high / (exp_time * exp_frames) - self.scan_acceleration = 500 - self.scan_safedistance = 10 - self.scan_accdistance = ( - self.scan_safedistance - + 0.5 * self.scan_velocity * self.scan_velocity / self.scan_acceleration - ) - self.scan_ntotal = exp_frames * repeats - - if repmode.upper() in ("POS", "NEG"): - self.scan_range = repeats * (gate_high + gate_low) - elif repmode.upper() in ("POSNEG", "NEGPOS"): - self.scan_range = gate_high + gate_low - else: - raise RuntimeError(f"Unsupported repetition mode: {repmode}") - - # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 - t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} - p_modes = { - "pso": "PsoOutput", - "event": "PsoEvent", - "inp0": "HighSpeedInput0RisingEdge", - "inp1": "HighSpeedInput1RisingEdge", - } - daq_trigger = t_modes[sync] - pso_trigger = p_modes[sync] - - # AeroScript substitutions - filename = "AerotechSimpleSequenceTemplate.ascript" - filesubs = { - "startpos": self.scan_start, - "scanrange": self.scan_range, - "nrepeat": self.scan_repnum, - "scanvel": self.scan_velocity, - "scanacc": self.scan_acceleration, - "accdist": self.scan_accdistance, - "npoints": self.scan_ntotal, - "scandir": self.scan_repmode.upper(), - "psotrigger": pso_trigger, - } - - # Drice data collection config - # FIXME: The data sources need to be set to same as the script - daqname = "es1_ddaq" - daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger} - # Gigafrost config - camname = "gfclient" - camcfg = { - "ntotal": self.scan_ntotal, - "nimages": self.exp_frames, - "exposure": 1000 * exp_time, - "period": 2000 * exp_time, - "pixel_width": roix, - "pixel_height": roiy, - } - - # Parameter validation before scan - if self.exp_frames <= 0: - raise RuntimeError(f"Requires at least one frame per cycle, got {self.exp_frames}") - - # Call super() - super().__init__( - filename=filename, - filesubs=filesubs, - controller="es1_tasks", - taskindex=4, - camera=camname, - camcfg=camcfg, - drvdaq=daqname, - daqcfg=daqcfg, - daqmode="collect", - preview="daq_stream1", - **kwargs, - ) - - def prepare_positions(self): - # Fill PSO vectors according to scan direction - # NOTE: Distance counter is bidirectional - pso_bounds_pos = [] - pso_bounds_neg = [] - if self.scan_repmode in ("pos", "Pos", "POS", "neg", "Neg", "NEG"): - pso_bounds_pos.append(self.scan_accdistance) - for ii in range(self.scan_repnum): - pso_bounds_pos += [self.gate_high, self.gate_low] - pso_bounds_pos = pso_bounds_pos[:-1] - if self.scan_repmode in ("posneg", "PosNeg", "POSNEG", "negpos", "NegPos", "NEGPOS"): - pso_bounds_pos = [self.scan_accdistance, self.gate_high] - pso_bounds_neg = [self.scan_accdistance + self.gate_low, self.gate_high] - - self.filesubs["psoBoundsPos"] = pso_bounds_pos - self.filesubs["psoBoundsNeg"] = pso_bounds_neg - self.filesubs["psoBoundsPosSize"] = len(pso_bounds_pos) - self.filesubs["psoBoundsNegSize"] = len(pso_bounds_neg) - # Call super() to do the substitutions - yield from super().prepare_positions() - - # def unstage(self): - # """ Wait for DAQ before unstaging""" - # time.sleep(1) - # yield from super().unstage() diff --git a/tomcat_bec/scans/tomcat_scans.py b/tomcat_bec/scans/tomcat_scans.py index 7a0d4f3..c32e16a 100644 --- a/tomcat_bec/scans/tomcat_scans.py +++ b/tomcat_bec/scans/tomcat_scans.py @@ -24,15 +24,28 @@ logger = bec_logger.logger class TomcatStepScan(ScanBase): - """Simple software step scan forTomcat + """Simple software step scan for Tomcat - Example class for simple BEC-based step scan using the low-level API. - All it does is translate conventional kwargs to tomcat specific device - pareameters and launches the standard step scan. + Example class for simple BEC-based step scan using the low-level API. It's just a standard + 'line_scan' with the only difference that overrides burst behavior to use camera burst instead + of software triggers. + - Example - ------- + NOTE: As decided by Tomcat, the scans should not manage the scope of devices + - All enabled devices are expected to be configured for acquisition by the end of stage + - Some devices can configure themselves from mandatory scan parameters (steps, burst) + - Other devices can be optionally configured by keyword arguments + - Devices will try to stage using whatever was set on them before + + Example: + -------- >>> scans.tomcatstepscan(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5) + + Common keyword arguments: + ------------------------- + image_width : int + image_height : int + ddc_trigger : str """ scan_name = "tomcatstepscan" @@ -40,7 +53,7 @@ class TomcatStepScan(ScanBase): required_kwargs = ["scan_start", "scan_end", "steps"] gui_config = { "Movement parameters": ["steps"], - "Acquisition parameters": ["exp_time", "exp_burst", "image_width", "image_height"], + "Acquisition parameters": ["exp_time", "exp_burst"], } def update_scan_motors(self): @@ -54,18 +67,12 @@ class TomcatStepScan(ScanBase): scan_start: float, scan_end: float, steps: int, - exp_time=0.005, - settling_time=0.2, - exp_burst=1, - image_width=2016, - image_height=2016, - sync="event", + exp_time: float=0.005, + settling_time: float=0.2, + exp_burst: int=1, **kwargs, ): # Converting generic kwargs to tomcat device configuration parameters - # Use PSO trigger - kwargs["parameter"]["kwargs"]["pso_wavemode"] = "pulsed" - super().__init__( exp_time=exp_time, settling_time=settling_time, @@ -88,22 +95,17 @@ class TomcatStepScan(ScanBase): def _at_each_point(self, ind=None, pos=None): """ Overriden at_each_point, using detector burst instaead of manual triggering""" - - trigger_time = self.exp_time * self.burst_at_each_point + yield from self._move_scan_motors_and_wait(pos) + time.sleep(self.settling_time) # yield from self.stubs.trigger(min_wait=trigger_time) yield from self.stubs.trigger(group='trigger', point_id=self.point_id) + + trigger_time = self.exp_time * self.burst_at_each_point time.sleep(trigger_time) - time.sleep(self.settling_time) yield from self.stubs.read(group="monitored", point_id=self.point_id, wait_group=None) - # self.point_id += 1 - - def cleanup(self): - """Set scan progress to 1 to finish the scan""" - self.num_pos = 1 - return super().cleanup() - + self.point_id += 1 class TomcatSnapNStep(AsyncFlyScanBase): diff --git a/tomcat_bec/scripts/anotherroundsans.py b/tomcat_bec/scripts/anotherroundsans.py index aa2dc5f..95b5785 100644 --- a/tomcat_bec/scripts/anotherroundsans.py +++ b/tomcat_bec/scripts/anotherroundsans.py @@ -28,7 +28,7 @@ def anotherstepscan( settling_time=0, image_width=2016, image_height=2016, - sync="event", + sync="inp1", ): """Demo step scan with GigaFrost @@ -65,6 +65,9 @@ def anotherstepscan( sync=sync ) + + + def anothersequencescan( scan_start, gate_high, @@ -100,9 +103,8 @@ def anothersequencescan( dev.daq_stream0.enabled = True dev.daq_stream1.enabled = False - print("Handing over to 'scans.sequencescan'") - scans.sequencescan( + scans.tomcatsimplesequencescan( scan_start, gate_high, gate_low,