diff --git a/tomcat_bec/scans/__init__.py b/tomcat_bec/scans/__init__.py index 8556ccf..9292522 100644 --- a/tomcat_bec/scans/__init__.py +++ b/tomcat_bec/scans/__init__.py @@ -1,4 +1,2 @@ from .tutorial_fly_scan import AcquireDark, AcquireFlat, TutorialFlyScanContLine -from .gigafrost_test import GigaFrostStepScan from .tomcat_scanbase import SnapAndStepScanBase, GigaStepScanBase, SequenceScanBase, SimpleFlyer -from .tutorial_fly_scan import AcquireDark, AcquireFlat, TutorialFlyScanContLine diff --git a/tomcat_bec/scans/aerotech_test.py b/tomcat_bec/scans/aerotech_test.py deleted file mode 100644 index a1d6072..0000000 --- a/tomcat_bec/scans/aerotech_test.py +++ /dev/null @@ -1,545 +0,0 @@ -import time - -import numpy as np - -from bec_lib import bec_logger -from scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase - -logger = bec_logger.logger - - -class AeroSingleScan(AsyncFlyScanBase): - scan_name = "aero_single_scan" - scan_report_hint = "scan_progress" - required_kwargs = ["startpos", "scanrange", "psodist"] - arg_input = {} - arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - - def __init__(self, *args, parameter: dict = None, **kwargs): - """Performs a single line scan with PSO output and data collection. - - Examples: - >>> scans.aero_single_scan(startpos=42, scanrange=2*10+3*180, psodist=[10, 180, 0.01, 180, 0.01, 180]) - - """ - super().__init__(parameter=parameter, **kwargs) - self.axis = [] - self.scan_motors = [] - self.num_pos = 0 - - self.scanStart = self.caller_kwargs.get("startpos") - self.scanEnd = self.scanStart + self.caller_kwargs.get("scanrange") - self.psoBounds = self.caller_kwargs.get("psodist") - self.scanVel = self.caller_kwargs.get("velocity", 30) - self.scanTra = self.caller_kwargs.get("travel", 80) - self.scanAcc = self.caller_kwargs.get("acceleration", 500) - self.scanExpNum = self.caller_kwargs.get("daqpoints", 5000) - - def pre_scan(self): - # Move to start position - st = yield from self.stubs.send_rpc_and_wait("es1_roty", "dmove", self.scanStart) - st.wait() - yield from self.stubs.pre_scan() - - def scan_report_instructions(self): - """Scan report instructions for the progress bar, yields from mcs card""" - if not self.scan_report_hint: - yield None - return - yield from self.stubs.scan_report_instruction({"scan_progress": ["es1_roty"]}) - - def scan_core(self): - # Configure PSO, DDC and motor - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc}, - ) - yield from self.stubs.send_rpc_and_wait( - "es1_psod", "configure", {"distance": self.psoBounds, "wmode": "toggle"} - ) - yield from self.stubs.send_rpc_and_wait( - "es1_ddaq", - "configure", - {"npoints": self.scanExpNum}, - ) - # DAQ with real trigger - # yield from self.stubs.send_rpc_and_wait( - # "es1_ddaq", "configure", {"npoints": self.scanExpNum, "trigger": "HSINP0_RISE"}, - # ) - - # Kick off PSO and DDC - st = yield from self.stubs.send_rpc_and_wait("es1_psod", "kickoff") - st.wait() - st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "kickoff") - st.wait() - - print("Start moving") - # Start the actual movement - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"target": self.scanEnd}, - ) - yield from self.stubs.kickoff( - device="es1_roty", - parameter={"target": self.scanEnd}, - ) - # yield from self.stubs.set(device='es1_roty', value=self.scanEnd, wait_group="flyer") - target_diid = self.DIID - 1 - - # Wait for motion to finish - while True: - yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", pointID=self.pointID) - self.pointID += 1 - status = self.stubs.get_req_status( - device="es1_roty", RID=self.metadata["RID"], DIID=target_diid - ) - progress = self.stubs.get_device_progress(device="es1_roty", RID=self.metadata["RID"]) - print(f"status: {status}\tprogress: {progress}") - if progress: - self.num_pos = progress - if status: - break - time.sleep(1) - print("Scan done\n\n") - self.num_pos = self.pointID - - -class AeroSequenceScan(AsyncFlyScanBase): - scan_name = "aero_sequence_scan" - scan_report_hint = "table" - required_kwargs = ["startpos", "ranges"] - arg_input = {} - arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - - def __init__(self, *args, parameter: dict = None, **kwargs): - """Performs a sequence scan with PSO output and data collection - - Examples: - >>> scans.aero_sequence_scan(startpos=42, ranges=([179.9, 0.1, 5]), expnum=3600, repnum=3, repmode="PosNeg") - - """ - super().__init__(parameter=parameter, **kwargs) - self.axis = [] - self.scan_motors = ["es1_roty"] - self.num_pos = 0 - - self.scanStart = self.caller_kwargs.get("startpos") - self.scanRanges = self.caller_kwargs.get("ranges") - self.scanExpNum = self.caller_kwargs.get("expnum", 25000) - self.scanRepNum = self.caller_kwargs.get("repnum", 1) - self.scanRepMode = self.caller_kwargs.get("repmode", "Pos") - self.scanVel = self.caller_kwargs.get("velocity", 30) - self.scanTra = self.caller_kwargs.get("travel", 80) - self.scanAcc = self.caller_kwargs.get("acceleration", 500) - self.scanSafeDist = self.caller_kwargs.get("safedist", 10) - - if isinstance(self.scanRanges[0], (int, float)): - self.scanRanges = self.scanRanges - - if self.scanRepMode not in ["PosNeg", "Pos", "NegPos", "Neg"]: - raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") - - def pre_scan(self): - # Calculate PSO positions from tables - AccDist = 0.5 * self.scanVel * self.scanVel / self.scanAcc + self.scanSafeDist - - # Relative PSO bounds - self.psoBoundsPos = [AccDist] - try: - for line in self.scanRanges: - print(f"Line is: {line} of type {type(line)}") - for rr in range(int(line[2])): - self.psoBoundsPos.append(line[0]) - self.psoBoundsPos.append(line[1]) - except TypeError: - line = self.scanRanges - print(f"Line is: {line} of type {type(line)}") - for rr in range(int(line[2])): - self.psoBoundsPos.append(line[0]) - self.psoBoundsPos.append(line[1]) - del self.psoBoundsPos[-1] - - self.psoBoundsNeg = [AccDist] - self.psoBoundsNeg.extend(self.psoBoundsPos[::-1]) - - scanrange = 2 * AccDist + np.sum(self.psoBoundsPos) - if self.scanRepMode in ["PosNeg", "Pos"]: - self.PosStart = self.scanStart - AccDist - self.PosEnd = self.scanStart + scanrange - elif self.scanRepMode in ["NegPos", "Neg"]: - self.PosStart = self.scanStart + AccDist - self.PosEnd = self.scanStart - scanrange - else: - raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") - print(f"\tCalculated scan range: {self.PosStart} to {self.PosEnd} range {scanrange}") - - # ToDo: We could append all distances and write a much longer 'distance array'. this would elliminate the need of rearming... - - # Move roughly to start position - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, - ) - st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) - st.wait() - - yield from self.stubs.pre_scan() - - def scan_core(self): - # Move to start position (with travel velocity) - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, - ) - yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) - - # Condigure PSO, DDC and motorHSINP0_RISE - yield from self.stubs.send_rpc_and_wait( - "es1_psod", "configure", {"distance": self.psoBoundsPos, "wmode": "toggle"} - ) - yield from self.stubs.send_rpc_and_wait( - "es1_ddaq", - "configure", - {"npoints": self.scanExpNum}, - ) - # With real trigger - # yield from self.stubs.send_rpc_and_wait( - # "es1_ddaq", "configure", {"npoints": self.scanExpNum, "trigger": "HSINP0_RISE"} - # ) - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc}, - ) - - # Kickoff pso and daq - st = yield from self.stubs.send_rpc_and_wait("es1_psod", "kickoff") - st.wait() - st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "kickoff") - st.wait() - - # Run the actual scan (haven't figured out the proggress bar) - print("Starting actual scan loop") - for ii in range(self.scanRepNum): - print(f"Scan segment {ii}") - # No option to reset the index counter... - yield from self.stubs.send_rpc_and_wait("es1_psod", "dstArrayRearm.set", 1) - - if self.scanRepMode in ["Pos", "Neg"]: - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc}, - ) - st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosEnd) - st.wait() - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, - ) - st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) - st.wait() - elif self.scanRepMode in ["PosNeg", "NegPos"]: - if ii % 2 == 0: - st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosEnd) - st.wait() - else: - st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) - st.wait() - self.pointID += 1 - self.num_pos += 1 - time.sleep(0.2) - - # Complete (should complete instantly) - yield from self.stubs.complete(device="es1_psod") - yield from self.stubs.complete(device="es1_ddaq") - st = yield from self.stubs.send_rpc_and_wait("es1_psod", "complete") - st.wait() - st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "complete") - st.wait() - - # Collect - Throws a warning due to returning a generator - # st = yield from self.stubs.send_rpc_and_wait("es1_psod", "collect") - # st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect") - - yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary") - target_diid = self.DIID - 1 - - yield from self.stubs.kickoff( - device="es1_roty", - parameter={"target": self.PosStart}, - ) - yield from self.stubs.wait(device=["es1_roty"], wait_group="kickoff", wait_type="move") - yield from self.stubs.complete(device="es1_roty") - - # Wait for motion to finish - while True: - pso_status = self.stubs.get_req_status( - device="es1_psod", RID=self.metadata["RID"], DIID=target_diid - ) - daq_status = self.stubs.get_req_status( - device="es1_ddaq", RID=self.metadata["RID"], DIID=target_diid - ) - mot_status = self.stubs.get_req_status( - device="es1_roty", RID=self.metadata["RID"], DIID=target_diid - ) - progress = self.stubs.get_device_progress(device="es1_psod", RID=self.metadata["RID"]) - progress = self.stubs.get_device_progress(device="es1_ddaq", RID=self.metadata["RID"]) - progress = self.stubs.get_device_progress(device="es1_roty", RID=self.metadata["RID"]) - print(f"pso: {pso_status}\tdaq: {daq_status}\tmot: {mot_status}\tprogress: {progress}") - if progress: - self.num_pos = int(progress) - if mot_status: - break - time.sleep(1) - print("Scan done\n\n") - - def cleanup(self): - """Set scan progress to 1 to finish the scan""" - self.num_pos = 1 - return super().cleanup() - - -class AeroScriptedScan(AsyncFlyScanBase): - scan_name = "aero_scripted_scan" - scan_report_hint = "table" - required_kwargs = ["filename", "subs"] - arg_input = {} - arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - - def __init__(self, *args, parameter: dict = None, **kwargs): - """Executes an AeroScript template as a flyer - - The script is generated from a template file using jinja2. - Examples: - >>> scans.aero_scripted_scan(filename="AerotechSnapAndStepTemplate.ascript", subs={'startpos': 42, 'stepsize': 0.1, 'numsteps': 1800, 'exptime': 0.1}) - - """ - super().__init__(parameter=parameter, **kwargs) - self.axis = [] - self.scan_motors = ["es1_roty"] - self.num_pos = 0 - - self.filename = self.caller_kwargs.get("filename") - self.subs = self.caller_kwargs.get("subs") - self.taskIndex = self.caller_kwargs.get("taskindex", 4) - - def pre_scan(self): - print("TOMCAT Loading Aeroscript template") - # Load the test file - with open(self.filename) as f: - templatetext = f.read() - - # Substitute jinja template - import jinja2 - - tm = jinja2.Template(templatetext) - self.scripttext = tm.render(scan=self.subs) - - yield from self.stubs.pre_scan() - - def scan_core(self): - print("TOMCAT Sequeence scan (via Jinjad AeroScript)") - t_start = time.time() - - # Configure by copying text to controller file and compiling it - yield from self.stubs.send_rpc_and_wait( - "es1_tasks", - "configure", - {"text": self.scripttext, "filename": "becExec.ascript", "taskIndex": self.taskIndex}, - ) - - # Kickoff - st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff") - st.wait() - time.sleep(0.5) - - # Complete - yield from self.stubs.complete(device="es1_tasks") - - # Collect - up to implementation - - t_end = time.time() - t_elapsed = t_end - t_start - print(f"Elapsed scan time: {t_elapsed}") - - def cleanup(self): - """Set scan progress to 1 to finish the scan""" - self.num_pos = self.pointID - return super().cleanup() - - -class AeroSnapNStep(AeroScriptedScan): - scan_name = "aero_snapNstep" - scan_report_hint = "table" - required_kwargs = ["startpos", "expnum"] - arg_input = {} - arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - - def __init__(self, *args, parameter: dict = None, **kwargs): - """Executes a scripted SnapNStep scan - - This scan generates and executes an AeroScript file to run - a hardware step scan on the Aerotech controller. - The script is generated from a template file using jinja2. - - Examples: - >>> scans.scans.aero_snapNstep(startpos=42, range=180, expnum=1800, exptime=0.1) - """ - super().__init__(parameter=parameter, **kwargs) - self.axis = [] - self.scan_motors = ["es1_roty"] - self.num_pos = 0 - - # self.filename = "/afs/psi.ch/user/m/mohacsi_i/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSnapAndStepTemplate.ascript" - self.filename = "/sls/X05LA/data/x05laop/bec/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSnapAndStepTemplate.ascript" - self.scanTaskIndex = self.caller_kwargs.get("taskindex", 4) - - self.scanStart = self.caller_kwargs.get("startpos") - self.scanExpNum = self.caller_kwargs.get("expnum") - self.scanRange = self.caller_kwargs.get("range", 180) - self.scanExpTime = self.caller_kwargs.get("exptime", 0.1) - self.scanStepSize = self.scanRange / self.scanExpNum - # self.scanVel = self.caller_kwargs.get("velocity", 30) - # self.scanTra = self.caller_kwargs.get("travel", 80) - # self.scanAcc = self.caller_kwargs.get("acceleration", 500) - - self.subs = { - "startpos": self.scanStart, - "stepsize": self.scanStepSize, - "numsteps": self.scanExpNum, - "exptime": self.scanExpTime, - } - - def scan_core(self): - print("TOMCAT Snap N Step scan (via Jinjad AeroScript)") - # Run template execution frm parent - yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", pointID=self.pointID) - self.pointID += 1 - yield from super().scan_core() - - # Collect - Throws a warning due to returning a generator - yield from self.stubs.send_rpc_and_wait("es1_ddaq", "npoints.put", self.scanExpNum) - # st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect") - # st.wait() - - -class AeroScriptedSequence(AeroScriptedScan): - scan_name = "aero_scripted_sequence" - scan_report_hint = "table" - required_kwargs = ["startpos", "ranges"] - arg_input = {} - arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - - def __init__(self, *args, parameter: dict = None, **kwargs): - """Executes a scripted sequence scan - - This scan generates and executes an AeroScript file to run a hardware sequence scan on the - Aerotech controller. You might win a few seconds this way, but it has some limtations... - The script is generated from a template file using jinja2. - - Examples: - >>> scans.aero_scripted_sequence(startpos=42, ranges=([179.9, 0.1, 5]), expnum=3600, repnum=3, repmode="PosNeg") - """ - super().__init__(parameter=parameter, **kwargs) - self.axis = [] - self.scan_motors = ["es1_roty"] - self.num_pos = 0 - - self.filename = "/afs/psi.ch/user/m/mohacsi_i/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSimpleSequenceTemplate.ascript" - self.scanTaskIndex = self.caller_kwargs.get("taskindex", 4) - - self.scanStart = self.caller_kwargs.get("startpos") - self.scanRanges = self.caller_kwargs.get("ranges") - self.scanExpNum = self.caller_kwargs.get("expnum", 25000) - self.scanRepNum = self.caller_kwargs.get("repnum", 1) - self.scanRepMode = self.caller_kwargs.get("repmode", "Pos") - - self.scanVel = self.caller_kwargs.get("velocity", 30) - self.scanTra = self.caller_kwargs.get("travel", 80) - self.scanAcc = self.caller_kwargs.get("acceleration", 500) - self.scanSafeDist = self.caller_kwargs.get("safedist", 10) - - self.subs = { - "startpos": self.scanStart, - "scandir": self.scanRepMode, - "nrepeat": self.scanRepNum, - "npoints": self.scanExpNum, - "scanvel": self.scanVel, - "jogvel": self.scanTra, - "scanacc": self.scanAcc, - } - - if self.scanRepMode not in ["PosNeg", "Pos", "NegPos", "Neg"]: - raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") - - if isinstance(self.scanRanges[0], (int, float)): - self.scanRanges = self.scanRanges - - def pre_scan(self): - # Calculate PSO positions from tables - AccDist = 0.5 * self.scanVel * self.scanVel / self.scanAcc + self.scanSafeDist - - # Relative PSO bounds - self.psoBoundsPos = [AccDist] - try: - for line in self.scanRanges: - print(f"Line is: {line} of type {type(line)}") - for rr in range(int(line[2])): - self.psoBoundsPos.append(line[0]) - self.psoBoundsPos.append(line[1]) - except TypeError: - line = self.scanRanges - print(f"Line is: {line} of type {type(line)}") - for rr in range(int(line[2])): - self.psoBoundsPos.append(line[0]) - self.psoBoundsPos.append(line[1]) - del self.psoBoundsPos[-1] - - self.psoBoundsNeg = [AccDist] - self.psoBoundsNeg.extend(self.psoBoundsPos[::-1]) - - self.scanrange = 2 * AccDist + np.sum(self.psoBoundsPos) - if self.scanRepMode in ["PosNeg", "Pos"]: - self.PosStart = self.scanStart - AccDist - elif self.scanRepMode in ["NegPos", "Neg"]: - self.PosStart = self.scanStart + AccDist - else: - raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") - print(f"\tCalculated scan range: {self.PosStart} range {self.scanrange}") - - # ToDo: We could append all distances and write a much longer 'distance array'. this would elliminate the need of rearming... - self.subs.update( - { - "psoBoundsPos": self.psoBoundsPos, - "psoBoundsNeg": self.psoBoundsNeg, - "scanrange": self.scanrange, - } - ) - - # Move roughly to start position - yield from self.stubs.send_rpc_and_wait( - "es1_roty", - "configure", - {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, - ) - st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) - st.wait() - - yield from super().pre_scan() - - def scan_core(self): - print("TOMCAT Sequence scan (via Jinjad AeroScript)") - # Run template execution frm parent - yield from super().scan_core() - - # Collect - Throws a warning due to returning a generator - yield from self.stubs.send_rpc_and_wait("es1_ddaq", "npoints.put", self.scanExpNum) - # st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect") - # st.wait() - diff --git a/tomcat_bec/scans/gigafrost_test.py b/tomcat_bec/scans/gigafrost_test.py deleted file mode 100644 index e9a7b6c..0000000 --- a/tomcat_bec/scans/gigafrost_test.py +++ /dev/null @@ -1,122 +0,0 @@ -import time -import numpy as np - -from bec_lib import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase - -logger = bec_logger.logger - - -class GigaFrostStepScan(AsyncFlyScanBase): - """Test scan for running the GigaFrost with standard DAQ - """ - scan_name = "gigafrost_line_scan" - scan_report_hint = "table" - required_kwargs = ["motor", "range"] - arg_input = {} - arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - - def __init__(self, *args, parameter: dict = None, **kwargs): - """Example step scan - - Perform a simple step scan with a motor while software triggering the - gigafrost burst sequence at each point and recording it to the StdDAQ. - Actually only the configuration is gigafrost specific, everything else - is just using standard Bluesky event model. - - Example - ------- - >>> scans.gigafrost_line_scan(motor='eyex', range=(-5, 5), steps=10, exp_time=20, exp_burst=10, relative=True) - - Parameters - ---------- - motor: str - Scan motor name, moveable, mandatory. - range : (float, float) - Scan range of the axis. - steps : int, optional - Number of scan steps to cover the range. (default = 10) - exp_time : float, optional [0.01 ... 40] - Exposure time for each frame in [ms]. The IOC fixes the exposure - period to be 2x this long so it doesnt matter. (default = 20) - exp_burst : float, optional - Number of images to be taken for each scan point. (default=10) - relative : boolean, optional - Toggle between relative and absolute input coordinates. - (default = False) - """ - super().__init__(parameter=parameter, **kwargs) - self.axis = [] - self.num_pos = 0 - - self.scan_motors = [self.caller_kwargs.get("motor")] - self.scan_range = self.caller_kwargs.get("range") - self.scan_relat = self.caller_kwargs.get("relative", False) - self.scan_steps = int(self.caller_kwargs.get("steps", 10)) - self.scan_exp_t = float(self.caller_kwargs.get("exp_time", 5)) - self.scan_exp_p = 2 * self.scan_exp_t - self.scan_exp_b = int(self.caller_kwargs.get("exp_burst", 10)) - - if self.scan_steps <=0: - raise RuntimeError(f"Requires at least one scan step") - - def prepare_positions(self): - # Calculate scan start position - if self.scan_relat: - pos = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "read") - self.start_pos = pos[self.scan_motors[0]].get("value") + self.scan_range[0] - else: - self.start_pos = self.scan_range[0] - - # Calculate step size - self.step_size = (self.scan_range[1]-self.scan_range[0])/self.scan_steps - - self.positions = [self.start_pos] - for ii in range(self.scan_steps): - self.positions.append(self.start_pos + ii*self.step_size) - - self.num_positions = len(self.positions) - - def pre_scan(self): - # Move roughly to start position - print(f"Scan start position: {self.start_pos}") - st = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "move", self.start_pos) - st.wait() - - yield from self.stubs.pre_scan() - - def stage(self): - d= { - "ntotal": self.scan_steps * self.scan_exp_b, - "nimages": self.scan_exp_b, - "exposure": self.scan_exp_t, - "period": self.scan_exp_p, - "pixel_width": 480, - "pixel_height": 128 - } - yield from self.stubs.send_rpc_and_wait("gfclient", "configure", d) - # For god, NO! - yield from super().stage() - - def scan_core(self): - self.pointID = 0 - for ii in range(self.num_positions): - print(f"Point: {ii}") - st = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "move", self.positions[ii]) - st.wait() - st = yield from self.stubs.send_rpc_and_wait("gfclient", "trigger") - st.wait() - self.pointID += 1 - time.sleep(0.2) - - time.sleep(1) - print("Scan done\n\n") - - def cleanup(self): - """Set scan progress to 1 to finish the scan""" - self.num_pos = 1 - print("Scan cleanup\n\n") - return super().cleanup() - - - diff --git a/tomcat_bec/scans/tomcat_scanbase.py b/tomcat_bec/scans/tomcat_scanbase.py index 0de5bd7..cfe10e4 100644 --- a/tomcat_bec/scans/tomcat_scanbase.py +++ b/tomcat_bec/scans/tomcat_scanbase.py @@ -13,19 +13,18 @@ import os import time from bec_lib import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase +from bec_server.scan_server.scans import AsyncFlyScanBase, ScanBase logger = bec_logger.logger - class TemplatedScanBase(AsyncFlyScanBase): - """ Base class for templated scans - + """Base class for templated scans + Low-level base class for templated AeroScript scans at the Tomcat beamlines. It sets the order of operations between aerotech, gigafrost and the standard - DAQ. But as a base class, it leaves ample freedom for individual - hardware configurations and scan implementations. + DAQ. But as a base class, it leaves ample freedom for individual + hardware configurations and scan implementations. Example ------- @@ -52,16 +51,27 @@ class TemplatedScanBase(AsyncFlyScanBase): daqmode: str Operation mode for the position recording. (default = collect) """ + scan_name = "aeroscript_scan_base" scan_report_hint = "table" required_kwargs = ["filename", "subs"] - def __init__(self, *, filename=None, filetext=None, filesubs={}, - controller="es1_tasks", taskindex=4, - camera="gfclient", camcfg=None, - drvdaq="es1_ddaq", daqcfg=None, daqmode="collect", - preview="daq_stream1", - **kwargs): + def __init__( + self, + *, + filename=None, + filetext=None, + filesubs={}, + controller="es1_tasks", + taskindex=4, + camera="gfclient", + camcfg=None, + drvdaq="es1_ddaq", + daqcfg=None, + daqmode="collect", + preview="daq_stream1", + **kwargs, + ): super().__init__(**kwargs) self.axis = [] self.num_pos = 0 @@ -81,12 +91,13 @@ class TemplatedScanBase(AsyncFlyScanBase): if self.filename is None and self.filetext is None: raise RuntimeError("Must provide either filename or text to scan") - def prepare_positions(self): - """ Prepare action: render AeroScript file""" + """Prepare action: render AeroScript file""" # Load the test file if self.filename is not None: - filename = os.path.join(os.path.dirname(__file__), '../devices/aerotech/' + self.filename) + filename = os.path.join( + os.path.dirname(__file__), "../devices/aerotech/" + self.filename + ) logger.info(f"Attempting to load file {filename}") with open(filename) as f: templatetext = f.read() @@ -97,9 +108,8 @@ class TemplatedScanBase(AsyncFlyScanBase): yield from super().prepare_positions() - def stage(self): - """ Configure and stage all devices""" + """Configure and stage all devices""" print("TOMCAT Staging scripted scan (via Jinjad AeroScript)") @@ -128,9 +138,8 @@ class TemplatedScanBase(AsyncFlyScanBase): # TODO : This should stage the entire beamline # yield from super().stage() - def scan_core(self): - """ The actual scan routine""" + """The actual scan routine""" print("TOMCAT Running scripted scan (via Jinjad AeroScript)") t_start = time.time() @@ -169,19 +178,19 @@ class TemplatedScanBase(AsyncFlyScanBase): return super().cleanup() - - - class SimpleFlyer(AsyncFlyScanBase): - """ Simple fly scan base class + """Simple fly scan base class Very simple flyer class (something like the bluesky flyer). It expects every device to be configured before the scan. Example ------- - >>> scans.simpleflyer(sync_devices=['daq_stream1'], flyer_devices=['es1_tasks', 'es1_psod', 'es1_ddaq']) + >>> scans.simpleflyer( + sync_devices=['daq_stream1'], + flyer_devices=['es1_tasks', 'es1_psod', 'es1_ddaq']) """ + scan_name = "simpleflyer" scan_report_hint = "table" required_kwargs = ["sync_devices", "flyer_devices"] @@ -190,7 +199,7 @@ class SimpleFlyer(AsyncFlyScanBase): # Auto-setup configuration parameters from input self.sync_devices = sync_devices self.async_devices = async_devices - + # Call super() super().__init__(**kwargs) @@ -211,57 +220,54 @@ class SimpleFlyer(AsyncFlyScanBase): for dev in self.async_devices: yield from self.stubs.send_rpc_and_wait(dev, "complete") - def unstage(self): - """ Wait for DAQ before unstaging""" + """Wait for DAQ before unstaging""" time.sleep(1) yield from super().unstage() - - - - class GigaStepScanBase(ScanBase): - """ Gigafrost software step scan base class + """Gigafrost software step scan base class Example class for simple BEC-based step scans using the low-level API. Until the final cabling is done, the scan can be tested in soft-trigger - mode. + mode. Example ------- >>> scans.gigastep(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5) """ + scan_name = "gigastep" - required_kwargs = ["scan_start", "scan_end","steps"] + required_kwargs = ["scan_start", "scan_end", "steps"] gui_config = { "Movement parameters": ["steps"], - "Acquisition parameters" : ["exp_time", "burst_at_each_point", "roix", "roiy"] + "Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy"], } def _get_scan_motors(self): self.scan_motors = ["es1_roty"] def __init__( - self, - scan_start: float, - scan_end: float, - steps: int, - exp_time = 0.005, - settling_time = 0.2, - burst_at_each_point = 1, - roix = 2016, - roiy = 2016, - sync='event', - **kwargs): + self, + scan_start: float, + scan_end: float, + steps: int, + exp_time=0.005, + settling_time=0.2, + burst_at_each_point=1, + roix=2016, + roiy=2016, + sync="event", + **kwargs, + ): super().__init__( exp_time=exp_time, settling_time=settling_time, relative=False, burst_at_each_point=1, optim_trajectory=None, - **kwargs + **kwargs, ) self.motor = "es1_roty" @@ -280,28 +286,28 @@ class GigaStepScanBase(ScanBase): # Prepare configuration dicts # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 - t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4} + t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} daq_trigger = t_modes[sync] self.psocfg = {} - self.daqcfg = {'ntotal': self.scan_ntotal, 'trigger': daq_trigger} + self.daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger} self.camcfg = { - "ntotal": self.scan_ntotal, - "nimages": burst_at_each_point, - "exposure": 1000*exp_time, - "period": 2000*exp_time, + "ntotal": self.scan_ntotal, + "nimages": burst_at_each_point, + "exposure": 1000 * exp_time, + "period": 2000 * exp_time, "trigger_mode": "soft", - "pixel_width": self.scan_roix, - "pixel_height": self.scan_roiy - } + "pixel_width": self.scan_roix, + "pixel_height": self.scan_roiy, + } def _calculate_positions(self) -> None: - """ Pre-calculate scan positions """ - for ii in range(self.scan_steps+1): - self.positions.append(self.scan_start + ii*self.scan_stepsize) + """Pre-calculate scan positions""" + for ii in range(self.scan_steps + 1): + self.positions.append(self.scan_start + ii * self.scan_stepsize) def stage(self): - """ First do the actual configuration then the gutted staging""" + """First do the actual configuration then the gutted staging""" print(f"TOMCAT Staging step scan with motors {self.motor}") # Configure the camera (usually together wit the DAQ) @@ -325,38 +331,48 @@ class GigaStepScanBase(ScanBase): return super().stage() def unstage(self): - """ Wait for DAQ before unstaging""" + """Wait for DAQ before unstaging""" time.sleep(1) yield from super().unstage() - class SnapAndStepScanBase(TemplatedScanBase): - """ Snap'n Step scan base class + """Snap'n Step scan base class Example base class for AeroScript-based high speed step scans. This - function loads a pre-written AeroScript file on the Automation1 + function loads a pre-written AeroScript file on the Automation1 controller for maximum stepping speed. - NOTE: As a temporary setup until the final cabling is done, the scan - is in a soft-trigger mode. The Gigafrost is just software triggered, + NOTE: As a temporary setup until the final cabling is done, the scan + is in a soft-trigger mode. The Gigafrost is just software triggered, and the position readback is wired to the internal trigger. Example ------- >>> scans.snapnstep(-20 160, steps=1800, exp_time=0.005, burst_at_each_point=5) """ + scan_name = "snapnstep" scan_report_hint = "table" required_kwargs = ["scan_start", "scan_end", "steps"] gui_config = { "Movement parameters": ["scan_start", "scan_end", "steps"], - "Acquisition parameters" : ["exp_time", "burst_at_each_point", "roix", "roiy", "sync"] + "Acquisition parameters": ["exp_time", "burst_at_each_point", "roix", "roiy", "sync"], } - def __init__(self, scan_start, scan_end, steps, - exp_time=0.005, settling_time=0, burst_at_each_point=1, - roix=2016, roiy=2016, sync="event", **kwargs): + def __init__( + self, + scan_start, + scan_end, + steps, + exp_time=0.005, + settling_time=0, + burst_at_each_point=1, + roix=2016, + roiy=2016, + sync="event", + **kwargs, + ): # Auto-setup configuration parameters from input self.scan_start = scan_start self.scan_end = scan_end @@ -366,87 +382,116 @@ class SnapAndStepScanBase(TemplatedScanBase): self.settling_time = settling_time # Synthetic values - self.scan_stepsize = (scan_end-scan_start)/steps - self.scan_ntotal = (steps+1) * burst_at_each_point + self.scan_stepsize = (scan_end - scan_start) / steps + self.scan_ntotal = (steps + 1) * burst_at_each_point # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 - t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4} - p_modes = {'pso': 'PsoOutput', 'event': 'PsoEvent', 'inp0': 'HighSpeedInput0RisingEdge', 'inp1': 'HighSpeedInput1RisingEdge'} + t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} + p_modes = { + "pso": "PsoOutput", + "event": "PsoEvent", + "inp0": "HighSpeedInput0RisingEdge", + "inp1": "HighSpeedInput1RisingEdge", + } daq_trigger = t_modes[sync] pso_trigger = p_modes[sync] filename = "AerotechSnapAndStepTemplate.ascript" filesubs = { - 'startpos': self.scan_start, - 'stepsize': self.scan_stepsize, - 'numsteps': self.scan_steps, - 'exptime': 2*self.exp_time*self.exp_burst, - 'settling': self.settling_time, - 'psotrigger': pso_trigger, - 'npoints': self.scan_ntotal - } + "startpos": self.scan_start, + "stepsize": self.scan_stepsize, + "numsteps": self.scan_steps, + "exptime": 2 * self.exp_time * self.exp_burst, + "settling": self.settling_time, + "psotrigger": pso_trigger, + "npoints": self.scan_ntotal, + } # DDC config - daqname = "es1_ddaq" - daqcfg = {'ntotal': self.scan_ntotal, 'trigger': daq_trigger} + daqname = "es1_ddaq" + daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger} # Gigafrost config camname = "gfclient" camcfg = { - "ntotal": self.scan_ntotal, - "nimages": burst_at_each_point, - "exposure": 1000 * exp_time, - "period": 2000*exp_time, + "ntotal": self.scan_ntotal, + "nimages": burst_at_each_point, + "exposure": 1000 * exp_time, + "period": 2000 * exp_time, "trigger_mode": "eternal", - "pixel_width": roix, - "pixel_height": roiy - } + "pixel_width": roix, + "pixel_height": roiy, + } # Parameter validation before scan - if self.scan_steps <=0: + if self.scan_steps <= 0: raise RuntimeError(f"Requires at least one scan step, got {self.scan_steps}") # Call super() super().__init__( - filename=filename, filesubs=filesubs, - controller="es1_tasks", taskindex=4, - camera=camname, camcfg=camcfg, - drvdaq=daqname, daqcfg=daqcfg, daqmode="collect", + filename=filename, + filesubs=filesubs, + controller="es1_tasks", + taskindex=4, + camera=camname, + camcfg=camcfg, + drvdaq=daqname, + daqcfg=daqcfg, + daqmode="collect", preview="daq_stream1", - **kwargs) + **kwargs, + ) def unstage(self): - """ Wait for DAQ before unstaging""" + """Wait for DAQ before unstaging""" time.sleep(1) yield from super().unstage() - - class SequenceScanBase(TemplatedScanBase): - """ Sequence scan base class + """Sequence scan base class Example base class for AeroScript-based sequence scans. This function loads a pre-written AeroScript file on the Automation1 controller for low latency gated scans. - NOTE: As a temporary setup until the final cabling is done, the scan - is in a soft-trigger mode. The Gigafrost is just software triggered, + NOTE: As a temporary setup until the final cabling is done, the scan + is in a soft-trigger mode. The Gigafrost is just software triggered, and the position readback is wired to the internal trigger. Example ------- >>> scans.sequencescan(33, 180, 180, exp_time=0.005, exp_frames=1800, repeats=10) """ + scan_name = "sequencescan" scan_report_hint = "table" required_kwargs = ["scan_start", "gate_high", "gate_low"] gui_config = { "Movement parameters": ["scan_start", "repeats", "repmode"], - "Acquisition parameters" : ["gate_high", "gate_low", "exp_time", "exp_frames", "roix", "roiy", "sync"] + "Acquisition parameters": [ + "gate_high", + "gate_low", + "exp_time", + "exp_frames", + "roix", + "roiy", + "sync", + ], } - def __init__(self, scan_start, gate_high, gate_low, repeats=1, repmode="PosNeg", - exp_time=0.005, exp_frames=180, - roix=2016, roiy=2016, sync="pso", **kwargs): + def __init__( + self, + scan_start, + gate_high, + gate_low, + repeats=1, + repmode="PosNeg", + exp_time=0.005, + exp_frames=180, + roix=2016, + roiy=2016, + sync="pso", + **kwargs, + ): # Auto-setup configuration parameters from input self.scan_start = scan_start self.gate_high = gate_high @@ -460,82 +505,96 @@ class SequenceScanBase(TemplatedScanBase): self.scan_velocity = gate_high / (exp_time * exp_frames) self.scan_acceleration = 500 self.scan_safedistance = 10 - self.scan_accdistance = self.scan_safedistance + 0.5* self.scan_velocity * self.scan_velocity / self.scan_acceleration + self.scan_accdistance = ( + self.scan_safedistance + + 0.5 * self.scan_velocity * self.scan_velocity / self.scan_acceleration + ) self.scan_ntotal = exp_frames * repeats if repmode.upper() in ("POS", "NEG"): - self.scan_range = repeats *(gate_high + gate_low) + self.scan_range = repeats * (gate_high + gate_low) elif repmode.upper() in ("POSNEG", "NEGPOS"): self.scan_range = gate_high + gate_low else: raise RuntimeError(f"Unsupported repetition mode: {repmode}") # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 - t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4} - p_modes = {'pso': 'PsoOutput', 'event': 'PsoEvent', 'inp0': 'HighSpeedInput0RisingEdge', 'inp1': 'HighSpeedInput1RisingEdge'} + t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} + p_modes = { + "pso": "PsoOutput", + "event": "PsoEvent", + "inp0": "HighSpeedInput0RisingEdge", + "inp1": "HighSpeedInput1RisingEdge", + } daq_trigger = t_modes[sync] pso_trigger = p_modes[sync] # AeroScript substitutions filename = "AerotechSimpleSequenceTemplate.ascript" filesubs = { - 'startpos': self.scan_start, - 'scanrange': self.scan_range, - 'nrepeat': self.scan_repnum, - 'scanvel': self.scan_velocity, - 'scanacc': self.scan_acceleration, - 'accdist': self.scan_accdistance, - 'npoints' : self.scan_ntotal, - 'scandir' : self.scan_repmode.upper(), - 'psotrigger': pso_trigger - } - print(filesubs) + "startpos": self.scan_start, + "scanrange": self.scan_range, + "nrepeat": self.scan_repnum, + "scanvel": self.scan_velocity, + "scanacc": self.scan_acceleration, + "accdist": self.scan_accdistance, + "npoints": self.scan_ntotal, + "scandir": self.scan_repmode.upper(), + "psotrigger": pso_trigger, + } + # Drice data collection config # FIXME: The data sources need to be set to same as the script - daqname = "es1_ddaq" - daqcfg = {'ntotal': self.scan_ntotal, 'trigger': daq_trigger} + daqname = "es1_ddaq" + daqcfg = {"ntotal": self.scan_ntotal, "trigger": daq_trigger} # Gigafrost config camname = "gfclient" camcfg = { - "ntotal": self.scan_ntotal, - "nimages": self.exp_frames, - "exposure": 1000 * exp_time, - "period": 2000*exp_time, - "pixel_width": roix, - "pixel_height": roiy - } + "ntotal": self.scan_ntotal, + "nimages": self.exp_frames, + "exposure": 1000 * exp_time, + "period": 2000 * exp_time, + "pixel_width": roix, + "pixel_height": roiy, + } # Parameter validation before scan - if self.exp_frames <=0: + if self.exp_frames <= 0: raise RuntimeError(f"Requires at least one frame per cycle, got {self.exp_frames}") # Call super() super().__init__( - filename=filename, filesubs=filesubs, - controller="es1_tasks", taskindex=4, - camera=camname, camcfg=camcfg, - drvdaq=daqname, daqcfg=daqcfg, daqmode="collect", + filename=filename, + filesubs=filesubs, + controller="es1_tasks", + taskindex=4, + camera=camname, + camcfg=camcfg, + drvdaq=daqname, + daqcfg=daqcfg, + daqmode="collect", preview="daq_stream1", - **kwargs) + **kwargs, + ) def prepare_positions(self): # Fill PSO vectors according to scan direction # NOTE: Distance counter is bidirectional - self.psoBoundsPos = [] - self.psoBoundsNeg = [] + pso_bounds_pos = [] + pso_bounds_neg = [] if self.scan_repmode in ("pos", "Pos", "POS", "neg", "Neg", "NEG"): - self.psoBoundsPos.append(self.scan_accdistance) + pso_bounds_pos.append(self.scan_accdistance) for ii in range(self.scan_repnum): - self.psoBoundsPos += [self.gate_high, self.gate_low] - self.psoBoundsPos = self.psoBoundsPos[:-1] + pso_bounds_pos += [self.gate_high, self.gate_low] + pso_bounds_pos = pso_bounds_pos[:-1] if self.scan_repmode in ("posneg", "PosNeg", "POSNEG", "negpos", "NegPos", "NEGPOS"): - self.psoBoundsPos = [self.scan_accdistance, self.gate_high] - self.psoBoundsNeg = [self.scan_accdistance + self.gate_low, self.gate_high] + pso_bounds_pos = [self.scan_accdistance, self.gate_high] + pso_bounds_neg = [self.scan_accdistance + self.gate_low, self.gate_high] - self.filesubs['psoBoundsPos'] = self.psoBoundsPos - self.filesubs['psoBoundsNeg'] = self.psoBoundsNeg - self.filesubs['psoBoundsPosSize'] = len(self.psoBoundsPos) - self.filesubs['psoBoundsNegSize'] = len(self.psoBoundsNeg) + self.filesubs["psoBoundsPos"] = pso_bounds_pos + self.filesubs["psoBoundsNeg"] = pso_bounds_neg + self.filesubs["psoBoundsPosSize"] = len(pso_bounds_pos) + self.filesubs["psoBoundsNegSize"] = len(pso_bounds_neg) # Call super() to do the substitutions yield from super().prepare_positions() @@ -543,8 +602,3 @@ class SequenceScanBase(TemplatedScanBase): # """ Wait for DAQ before unstaging""" # time.sleep(1) # yield from super().unstage() - - - - - diff --git a/tomcat_bec/scripts/demoscans.py b/tomcat_bec/scripts/demoscans.py index 4a2e139..8e7ee8d 100644 --- a/tomcat_bec/scripts/demoscans.py +++ b/tomcat_bec/scripts/demoscans.py @@ -1,21 +1,31 @@ """ Demo scans for Tomcat at the microXAS test bench """ + def bl_check_beam(): - """ Checks beamline status""" + """Checks beamline status""" motor_enabled = bool(dev.es1_roty.motor_enable.get()) result = motor_enabled return result - -def demostepscan(scan_start, scan_end, steps, exp_time=0.005, exp_burst=1, settling_time=0, roix=2016, roiy=2016, sync='event'): - """ Demo step scan with GigaFrost +def demostepscan( + scan_start, + scan_end, + steps, + exp_time=0.005, + exp_burst=1, + settling_time=0, + roix=2016, + roiy=2016, + sync="event", +): + """Demo step scan with GigaFrost This is a small BEC user-space demo step scan at the microXAS testbench using the gigafrost in software triggering mode. It tries to be a standard BEC scan, while still setting up the environment. - + Example: -------- demostepscan(scan_start=-32, scan_end=148, steps=180, exp_time=0.005, exp_burst=5) @@ -26,15 +36,19 @@ def demostepscan(scan_start, scan_end, steps, exp_time=0.005, exp_burst=1, settl scan_ntotal = exp_burst * (steps + 1) # Move to start position - t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4} - cfg = {'ntotal': scan_ntotal*10, 'trigger': t_modes[sync]} - dev.es1_ddaq.configure(d=cfg) + t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} + cfg = {"ntotal": scan_ntotal * 10, "trigger": t_modes[sync]} + dev.es1_ddaq.configure(d=cfg) # Configure gigafrost cfg = { - "ntotal": scan_ntotal, "nimages": exp_burst, - "exposure": 1000*exp_time, "period": 2000*exp_time, - "pixel_width": roix, "pixel_height": roiy, "trigger_mode": "soft" - } + "ntotal": scan_ntotal, + "nimages": exp_burst, + "exposure": 1000 * exp_time, + "period": 2000 * exp_time, + "pixel_width": roix, + "pixel_height": roiy, + "trigger_mode": "soft", + } dev.gfclient.configure(d=cfg) # Configure PSO trigger (trigger is normally disabled in fly mode) # dev.es1_psod.configure(d={}) @@ -45,24 +59,39 @@ def demostepscan(scan_start, scan_end, steps, exp_time=0.005, exp_burst=1, settl # dev.es1_roty.move(scan_start).wait() print("Handing over to 'scans.line_scan'") - wait_time = 2*exp_time * exp_burst + wait_time = 2 * exp_time * exp_burst scans.line_scan( - dev.es1_roty, scan_start, scan_end, - steps=steps, exp_time=wait_time, relative=False, settling_time=settling_time) + dev.es1_roty, + scan_start, + scan_end, + steps=steps, + exp_time=wait_time, + relative=False, + settling_time=settling_time, + ) # Cleanup: disable SW trigger dev.es1_psod.software_trigger = False - -def demosequencescan(scan_start, gate_high, gate_low, repeats=1, repmode="PosNeg", - exp_time=0.005, exp_frames=180, roix=2016, roiy=2016, sync='pso'): - """ Demo sequence scan with GigaFrost +def demosequencescan( + scan_start, + gate_high, + gate_low, + repeats=1, + repmode="PosNeg", + exp_time=0.005, + exp_frames=180, + roix=2016, + roiy=2016, + sync="pso", +): + """Demo sequence scan with GigaFrost This is a small BEC user-space sequence scan at the microXAS testbench triggering a customized scripted scan on the controller. The scan uses a pre-written custom low-level sequence scan, so it is really minimal. - + NOTE: It calls the AeroScript template version. Example: @@ -73,12 +102,33 @@ def demosequencescan(scan_start, gate_high, gate_low, repeats=1, repmode="PosNeg raise RuntimeError("Beamline is not in ready state") print("Handing over to 'scans.sequencescan'") - scans.sequencescan(scan_start, gate_high, gate_low, exp_time=exp_time, exp_frames=exp_frames, repeats=repeats, roix=roix, roiy=roiy, repmode=repmode, sync=sync) + scans.sequencescan( + scan_start, + gate_high, + gate_low, + exp_time=exp_time, + exp_frames=exp_frames, + repeats=repeats, + roix=roix, + roiy=roiy, + repmode=repmode, + sync=sync, + ) -def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg", - exp_time=0.005, exp_frames=180, roix=2016, roiy=2016, sync='pso'): - """ Demo sequence scan with GigaFrost via BEC +def becsequencescan( + start, + gate_high, + gate_low, + repeats=1, + repmode="PosNeg", + exp_time=0.005, + exp_frames=180, + roix=2016, + roiy=2016, + sync="pso", +): + """Demo sequence scan with GigaFrost via BEC This is a small BEC user-space sequence scan at the microXAS testbench doing every scan logic from BEC, i.e. there's no AeroScript involved. @@ -91,7 +141,7 @@ def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg", raise RuntimeError("Beamline is not in ready state") # Parameter validation before scan - if exp_frames <=0: + if exp_frames <= 0: raise RuntimeError(f"Requires at least one frame per cycle, got {self.exp_frames}") # Synthetic values @@ -102,43 +152,43 @@ def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg", scan_velocity = gate_high / (exp_time * exp_frames) scan_acceleration = 500 scan_safedistance = 10 - scan_accdistance = scan_safedistance + 0.5* scan_velocity * scan_velocity / scan_acceleration + scan_accdistance = scan_safedistance + 0.5 * scan_velocity * scan_velocity / scan_acceleration scan_ntotal = exp_frames * repeats if scan_repmode in ("POS", "NEG"): - scan_range = repeats *(gate_high + gate_low) - if scan_repmode=="POS": + scan_range = repeats * (gate_high + gate_low) + if scan_repmode == "POS": scan_start = start - scan_accdistance scan_end = start + scan_range + scan_accdistance - if scan_repmode=="NEG": + if scan_repmode == "NEG": scan_start = start + scan_accdistance scan_end = start - scan_range - scan_accdistance elif scan_repmode in ("POSNEG", "NEGPOS"): scan_range = gate_high + gate_low - if scan_repmode=="POSNEG": + if scan_repmode == "POSNEG": scan_start = start - scan_accdistance scan_end = start + scan_range + scan_accdistance - if scan_repmode=="NEGPOS": + if scan_repmode == "NEGPOS": scan_start = start + scan_accdistance scan_end = start - scan_range - scan_accdistance else: raise RuntimeError(f"Unsupported repetition mode: {repmode}") # Aerotech DDC settings: Internal event trigger: PsoEvent = 1 - t_modes = {'pso': 0, 'event': 1, 'inp0': 2, 'inp1': 4} + t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4} daq_trigger = t_modes[sync] # Drice data collection config - daqcfg = {'ntotal': scan_ntotal, 'trigger': daq_trigger} + daqcfg = {"ntotal": scan_ntotal, "trigger": daq_trigger} # Gigafrost config camcfg = { - "ntotal": scan_ntotal, - "nimages": exp_frames, - "exposure": 1000 * exp_time, - "period": 2000*exp_time, - "pixel_width": scan_roix, - "pixel_height": scan_roiy - } + "ntotal": scan_ntotal, + "nimages": exp_frames, + "exposure": 1000 * exp_time, + "period": 2000 * exp_time, + "pixel_width": scan_roix, + "pixel_height": scan_roiy, + } # PSO config # Fill PSO vectors according to scan direction @@ -176,16 +226,16 @@ def becsequencescan(start, gate_high, gate_low, repeats=1, repmode="PosNeg", print("Manual motor scan") if scan_repmode in ["POS", "NEG"]: - dev.es1_psod.prepare(psoBoundsPos) - dev.es1_roty.move(scan_end).wait() + dev.es1_psod.prepare(psoBoundsPos) + dev.es1_roty.move(scan_end).wait() elif scan_repmode in ["POSNEG", "NEGPOS"]: for ii in range(scan_repnum): - if ii%2==0: + if ii % 2 == 0: dev.es1_psod.prepare(psoBoundsPos) # FIXME : Temporary trigger dev.gfclient.trigger() dev.es1_roty.move(scan_end).wait() - if ii%2==1: + if ii % 2 == 1: dev.es1_psod.prepare(psoBoundsNeg) # FIXME : Temporary trigger dev.gfclient.trigger()