From 26022c46f0f7e6683dd9261b7e4f448d7c232f8a Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Fri, 5 Jul 2024 14:10:56 +0200 Subject: [PATCH] Added a test step scan with GigaFrost --- .../devices/gigafrost/gigafrostclient.py | 21 +- tomcat_bec/scans/aerotech_test.py | 545 ++++++++++++++++++ tomcat_bec/scans/gigafrost_test.py | 93 +++ 3 files changed, 651 insertions(+), 8 deletions(-) create mode 100644 tomcat_bec/scans/aerotech_test.py create mode 100644 tomcat_bec/scans/gigafrost_test.py diff --git a/tomcat_bec/devices/gigafrost/gigafrostclient.py b/tomcat_bec/devices/gigafrost/gigafrostclient.py index b2c0a0a..219cc65 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostclient.py +++ b/tomcat_bec/devices/gigafrost/gigafrostclient.py @@ -344,15 +344,18 @@ class GigaFrostClient(Device): } def stage(self): - """Standard ophyd method to start an acquisition""" + """Standard ophyd method to start an acquisition + + In ophyd it still needs to be triggered to perfor an actual capture. + """ if self.infoBusyFlag.value: raise RuntimeError("Camera is already busy, unstage it first!") # change to running self.cmdStartCamera.set(1).wait() - # soft trigger on - if self._auto_soft_enable: - self.cmdSoftEnable.set(1).wait() + # soft trigger on (DISABLED: use trigger() in ophyd) + #if self._auto_soft_enable: + # self.cmdSoftEnable.set(1).wait() self.state = const.GfStatus.ACQUIRING # Gigafrost can finish a run without explicit unstaging @@ -373,17 +376,19 @@ class GigaFrostClient(Device): self.unstage() def trigger(self) -> DeviceStatus: - """Sends a software trigger""" + """Sends a software trigger and approximately waits to finnish""" + status = DeviceStatus(self, settle_time=2.0) + # Soft triggering based on operation mode - if self._auto_soft_enable: + if self._auto_soft_enable and self.trigger_mode=='auto' and self.enable_mode=='soft': # BEC teststand operation mode: poedge of SoftEnable if Started self.cmdSoftEnable.set(0).wait() self.cmdSoftEnable.set(1).wait() + sleep(self.cfgFramerate.value*self.cfgCntNum*0.001) else: self.cmdSoftTrigger.set(1).wait() - status = DeviceStatus(self, settle_time=2.0) status.set_finished() - sleep(2.0) + return status @property diff --git a/tomcat_bec/scans/aerotech_test.py b/tomcat_bec/scans/aerotech_test.py new file mode 100644 index 0000000..a1d6072 --- /dev/null +++ b/tomcat_bec/scans/aerotech_test.py @@ -0,0 +1,545 @@ +import time + +import numpy as np + +from bec_lib import bec_logger +from scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase + +logger = bec_logger.logger + + +class AeroSingleScan(AsyncFlyScanBase): + scan_name = "aero_single_scan" + scan_report_hint = "scan_progress" + required_kwargs = ["startpos", "scanrange", "psodist"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Performs a single line scan with PSO output and data collection. + + Examples: + >>> scans.aero_single_scan(startpos=42, scanrange=2*10+3*180, psodist=[10, 180, 0.01, 180, 0.01, 180]) + + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.scan_motors = [] + self.num_pos = 0 + + self.scanStart = self.caller_kwargs.get("startpos") + self.scanEnd = self.scanStart + self.caller_kwargs.get("scanrange") + self.psoBounds = self.caller_kwargs.get("psodist") + self.scanVel = self.caller_kwargs.get("velocity", 30) + self.scanTra = self.caller_kwargs.get("travel", 80) + self.scanAcc = self.caller_kwargs.get("acceleration", 500) + self.scanExpNum = self.caller_kwargs.get("daqpoints", 5000) + + def pre_scan(self): + # Move to start position + st = yield from self.stubs.send_rpc_and_wait("es1_roty", "dmove", self.scanStart) + st.wait() + yield from self.stubs.pre_scan() + + def scan_report_instructions(self): + """Scan report instructions for the progress bar, yields from mcs card""" + if not self.scan_report_hint: + yield None + return + yield from self.stubs.scan_report_instruction({"scan_progress": ["es1_roty"]}) + + def scan_core(self): + # Configure PSO, DDC and motor + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc}, + ) + yield from self.stubs.send_rpc_and_wait( + "es1_psod", "configure", {"distance": self.psoBounds, "wmode": "toggle"} + ) + yield from self.stubs.send_rpc_and_wait( + "es1_ddaq", + "configure", + {"npoints": self.scanExpNum}, + ) + # DAQ with real trigger + # yield from self.stubs.send_rpc_and_wait( + # "es1_ddaq", "configure", {"npoints": self.scanExpNum, "trigger": "HSINP0_RISE"}, + # ) + + # Kick off PSO and DDC + st = yield from self.stubs.send_rpc_and_wait("es1_psod", "kickoff") + st.wait() + st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "kickoff") + st.wait() + + print("Start moving") + # Start the actual movement + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"target": self.scanEnd}, + ) + yield from self.stubs.kickoff( + device="es1_roty", + parameter={"target": self.scanEnd}, + ) + # yield from self.stubs.set(device='es1_roty', value=self.scanEnd, wait_group="flyer") + target_diid = self.DIID - 1 + + # Wait for motion to finish + while True: + yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", pointID=self.pointID) + self.pointID += 1 + status = self.stubs.get_req_status( + device="es1_roty", RID=self.metadata["RID"], DIID=target_diid + ) + progress = self.stubs.get_device_progress(device="es1_roty", RID=self.metadata["RID"]) + print(f"status: {status}\tprogress: {progress}") + if progress: + self.num_pos = progress + if status: + break + time.sleep(1) + print("Scan done\n\n") + self.num_pos = self.pointID + + +class AeroSequenceScan(AsyncFlyScanBase): + scan_name = "aero_sequence_scan" + scan_report_hint = "table" + required_kwargs = ["startpos", "ranges"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Performs a sequence scan with PSO output and data collection + + Examples: + >>> scans.aero_sequence_scan(startpos=42, ranges=([179.9, 0.1, 5]), expnum=3600, repnum=3, repmode="PosNeg") + + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.scan_motors = ["es1_roty"] + self.num_pos = 0 + + self.scanStart = self.caller_kwargs.get("startpos") + self.scanRanges = self.caller_kwargs.get("ranges") + self.scanExpNum = self.caller_kwargs.get("expnum", 25000) + self.scanRepNum = self.caller_kwargs.get("repnum", 1) + self.scanRepMode = self.caller_kwargs.get("repmode", "Pos") + self.scanVel = self.caller_kwargs.get("velocity", 30) + self.scanTra = self.caller_kwargs.get("travel", 80) + self.scanAcc = self.caller_kwargs.get("acceleration", 500) + self.scanSafeDist = self.caller_kwargs.get("safedist", 10) + + if isinstance(self.scanRanges[0], (int, float)): + self.scanRanges = self.scanRanges + + if self.scanRepMode not in ["PosNeg", "Pos", "NegPos", "Neg"]: + raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") + + def pre_scan(self): + # Calculate PSO positions from tables + AccDist = 0.5 * self.scanVel * self.scanVel / self.scanAcc + self.scanSafeDist + + # Relative PSO bounds + self.psoBoundsPos = [AccDist] + try: + for line in self.scanRanges: + print(f"Line is: {line} of type {type(line)}") + for rr in range(int(line[2])): + self.psoBoundsPos.append(line[0]) + self.psoBoundsPos.append(line[1]) + except TypeError: + line = self.scanRanges + print(f"Line is: {line} of type {type(line)}") + for rr in range(int(line[2])): + self.psoBoundsPos.append(line[0]) + self.psoBoundsPos.append(line[1]) + del self.psoBoundsPos[-1] + + self.psoBoundsNeg = [AccDist] + self.psoBoundsNeg.extend(self.psoBoundsPos[::-1]) + + scanrange = 2 * AccDist + np.sum(self.psoBoundsPos) + if self.scanRepMode in ["PosNeg", "Pos"]: + self.PosStart = self.scanStart - AccDist + self.PosEnd = self.scanStart + scanrange + elif self.scanRepMode in ["NegPos", "Neg"]: + self.PosStart = self.scanStart + AccDist + self.PosEnd = self.scanStart - scanrange + else: + raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") + print(f"\tCalculated scan range: {self.PosStart} to {self.PosEnd} range {scanrange}") + + # ToDo: We could append all distances and write a much longer 'distance array'. this would elliminate the need of rearming... + + # Move roughly to start position + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, + ) + st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) + st.wait() + + yield from self.stubs.pre_scan() + + def scan_core(self): + # Move to start position (with travel velocity) + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, + ) + yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) + + # Condigure PSO, DDC and motorHSINP0_RISE + yield from self.stubs.send_rpc_and_wait( + "es1_psod", "configure", {"distance": self.psoBoundsPos, "wmode": "toggle"} + ) + yield from self.stubs.send_rpc_and_wait( + "es1_ddaq", + "configure", + {"npoints": self.scanExpNum}, + ) + # With real trigger + # yield from self.stubs.send_rpc_and_wait( + # "es1_ddaq", "configure", {"npoints": self.scanExpNum, "trigger": "HSINP0_RISE"} + # ) + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc}, + ) + + # Kickoff pso and daq + st = yield from self.stubs.send_rpc_and_wait("es1_psod", "kickoff") + st.wait() + st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "kickoff") + st.wait() + + # Run the actual scan (haven't figured out the proggress bar) + print("Starting actual scan loop") + for ii in range(self.scanRepNum): + print(f"Scan segment {ii}") + # No option to reset the index counter... + yield from self.stubs.send_rpc_and_wait("es1_psod", "dstArrayRearm.set", 1) + + if self.scanRepMode in ["Pos", "Neg"]: + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"velocity": self.scanVel, "acceleration": self.scanVel / self.scanAcc}, + ) + st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosEnd) + st.wait() + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, + ) + st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) + st.wait() + elif self.scanRepMode in ["PosNeg", "NegPos"]: + if ii % 2 == 0: + st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosEnd) + st.wait() + else: + st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) + st.wait() + self.pointID += 1 + self.num_pos += 1 + time.sleep(0.2) + + # Complete (should complete instantly) + yield from self.stubs.complete(device="es1_psod") + yield from self.stubs.complete(device="es1_ddaq") + st = yield from self.stubs.send_rpc_and_wait("es1_psod", "complete") + st.wait() + st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "complete") + st.wait() + + # Collect - Throws a warning due to returning a generator + # st = yield from self.stubs.send_rpc_and_wait("es1_psod", "collect") + # st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect") + + yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary") + target_diid = self.DIID - 1 + + yield from self.stubs.kickoff( + device="es1_roty", + parameter={"target": self.PosStart}, + ) + yield from self.stubs.wait(device=["es1_roty"], wait_group="kickoff", wait_type="move") + yield from self.stubs.complete(device="es1_roty") + + # Wait for motion to finish + while True: + pso_status = self.stubs.get_req_status( + device="es1_psod", RID=self.metadata["RID"], DIID=target_diid + ) + daq_status = self.stubs.get_req_status( + device="es1_ddaq", RID=self.metadata["RID"], DIID=target_diid + ) + mot_status = self.stubs.get_req_status( + device="es1_roty", RID=self.metadata["RID"], DIID=target_diid + ) + progress = self.stubs.get_device_progress(device="es1_psod", RID=self.metadata["RID"]) + progress = self.stubs.get_device_progress(device="es1_ddaq", RID=self.metadata["RID"]) + progress = self.stubs.get_device_progress(device="es1_roty", RID=self.metadata["RID"]) + print(f"pso: {pso_status}\tdaq: {daq_status}\tmot: {mot_status}\tprogress: {progress}") + if progress: + self.num_pos = int(progress) + if mot_status: + break + time.sleep(1) + print("Scan done\n\n") + + def cleanup(self): + """Set scan progress to 1 to finish the scan""" + self.num_pos = 1 + return super().cleanup() + + +class AeroScriptedScan(AsyncFlyScanBase): + scan_name = "aero_scripted_scan" + scan_report_hint = "table" + required_kwargs = ["filename", "subs"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Executes an AeroScript template as a flyer + + The script is generated from a template file using jinja2. + Examples: + >>> scans.aero_scripted_scan(filename="AerotechSnapAndStepTemplate.ascript", subs={'startpos': 42, 'stepsize': 0.1, 'numsteps': 1800, 'exptime': 0.1}) + + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.scan_motors = ["es1_roty"] + self.num_pos = 0 + + self.filename = self.caller_kwargs.get("filename") + self.subs = self.caller_kwargs.get("subs") + self.taskIndex = self.caller_kwargs.get("taskindex", 4) + + def pre_scan(self): + print("TOMCAT Loading Aeroscript template") + # Load the test file + with open(self.filename) as f: + templatetext = f.read() + + # Substitute jinja template + import jinja2 + + tm = jinja2.Template(templatetext) + self.scripttext = tm.render(scan=self.subs) + + yield from self.stubs.pre_scan() + + def scan_core(self): + print("TOMCAT Sequeence scan (via Jinjad AeroScript)") + t_start = time.time() + + # Configure by copying text to controller file and compiling it + yield from self.stubs.send_rpc_and_wait( + "es1_tasks", + "configure", + {"text": self.scripttext, "filename": "becExec.ascript", "taskIndex": self.taskIndex}, + ) + + # Kickoff + st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff") + st.wait() + time.sleep(0.5) + + # Complete + yield from self.stubs.complete(device="es1_tasks") + + # Collect - up to implementation + + t_end = time.time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") + + def cleanup(self): + """Set scan progress to 1 to finish the scan""" + self.num_pos = self.pointID + return super().cleanup() + + +class AeroSnapNStep(AeroScriptedScan): + scan_name = "aero_snapNstep" + scan_report_hint = "table" + required_kwargs = ["startpos", "expnum"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Executes a scripted SnapNStep scan + + This scan generates and executes an AeroScript file to run + a hardware step scan on the Aerotech controller. + The script is generated from a template file using jinja2. + + Examples: + >>> scans.scans.aero_snapNstep(startpos=42, range=180, expnum=1800, exptime=0.1) + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.scan_motors = ["es1_roty"] + self.num_pos = 0 + + # self.filename = "/afs/psi.ch/user/m/mohacsi_i/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSnapAndStepTemplate.ascript" + self.filename = "/sls/X05LA/data/x05laop/bec/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSnapAndStepTemplate.ascript" + self.scanTaskIndex = self.caller_kwargs.get("taskindex", 4) + + self.scanStart = self.caller_kwargs.get("startpos") + self.scanExpNum = self.caller_kwargs.get("expnum") + self.scanRange = self.caller_kwargs.get("range", 180) + self.scanExpTime = self.caller_kwargs.get("exptime", 0.1) + self.scanStepSize = self.scanRange / self.scanExpNum + # self.scanVel = self.caller_kwargs.get("velocity", 30) + # self.scanTra = self.caller_kwargs.get("travel", 80) + # self.scanAcc = self.caller_kwargs.get("acceleration", 500) + + self.subs = { + "startpos": self.scanStart, + "stepsize": self.scanStepSize, + "numsteps": self.scanExpNum, + "exptime": self.scanExpTime, + } + + def scan_core(self): + print("TOMCAT Snap N Step scan (via Jinjad AeroScript)") + # Run template execution frm parent + yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", pointID=self.pointID) + self.pointID += 1 + yield from super().scan_core() + + # Collect - Throws a warning due to returning a generator + yield from self.stubs.send_rpc_and_wait("es1_ddaq", "npoints.put", self.scanExpNum) + # st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect") + # st.wait() + + +class AeroScriptedSequence(AeroScriptedScan): + scan_name = "aero_scripted_sequence" + scan_report_hint = "table" + required_kwargs = ["startpos", "ranges"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Executes a scripted sequence scan + + This scan generates and executes an AeroScript file to run a hardware sequence scan on the + Aerotech controller. You might win a few seconds this way, but it has some limtations... + The script is generated from a template file using jinja2. + + Examples: + >>> scans.aero_scripted_sequence(startpos=42, ranges=([179.9, 0.1, 5]), expnum=3600, repnum=3, repmode="PosNeg") + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.scan_motors = ["es1_roty"] + self.num_pos = 0 + + self.filename = "/afs/psi.ch/user/m/mohacsi_i/ophyd_devices/ophyd_devices/epics/devices/aerotech/AerotechSimpleSequenceTemplate.ascript" + self.scanTaskIndex = self.caller_kwargs.get("taskindex", 4) + + self.scanStart = self.caller_kwargs.get("startpos") + self.scanRanges = self.caller_kwargs.get("ranges") + self.scanExpNum = self.caller_kwargs.get("expnum", 25000) + self.scanRepNum = self.caller_kwargs.get("repnum", 1) + self.scanRepMode = self.caller_kwargs.get("repmode", "Pos") + + self.scanVel = self.caller_kwargs.get("velocity", 30) + self.scanTra = self.caller_kwargs.get("travel", 80) + self.scanAcc = self.caller_kwargs.get("acceleration", 500) + self.scanSafeDist = self.caller_kwargs.get("safedist", 10) + + self.subs = { + "startpos": self.scanStart, + "scandir": self.scanRepMode, + "nrepeat": self.scanRepNum, + "npoints": self.scanExpNum, + "scanvel": self.scanVel, + "jogvel": self.scanTra, + "scanacc": self.scanAcc, + } + + if self.scanRepMode not in ["PosNeg", "Pos", "NegPos", "Neg"]: + raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") + + if isinstance(self.scanRanges[0], (int, float)): + self.scanRanges = self.scanRanges + + def pre_scan(self): + # Calculate PSO positions from tables + AccDist = 0.5 * self.scanVel * self.scanVel / self.scanAcc + self.scanSafeDist + + # Relative PSO bounds + self.psoBoundsPos = [AccDist] + try: + for line in self.scanRanges: + print(f"Line is: {line} of type {type(line)}") + for rr in range(int(line[2])): + self.psoBoundsPos.append(line[0]) + self.psoBoundsPos.append(line[1]) + except TypeError: + line = self.scanRanges + print(f"Line is: {line} of type {type(line)}") + for rr in range(int(line[2])): + self.psoBoundsPos.append(line[0]) + self.psoBoundsPos.append(line[1]) + del self.psoBoundsPos[-1] + + self.psoBoundsNeg = [AccDist] + self.psoBoundsNeg.extend(self.psoBoundsPos[::-1]) + + self.scanrange = 2 * AccDist + np.sum(self.psoBoundsPos) + if self.scanRepMode in ["PosNeg", "Pos"]: + self.PosStart = self.scanStart - AccDist + elif self.scanRepMode in ["NegPos", "Neg"]: + self.PosStart = self.scanStart + AccDist + else: + raise RuntimeError(f"Unexpected sequence repetition mode: {self.scanRepMode}") + print(f"\tCalculated scan range: {self.PosStart} range {self.scanrange}") + + # ToDo: We could append all distances and write a much longer 'distance array'. this would elliminate the need of rearming... + self.subs.update( + { + "psoBoundsPos": self.psoBoundsPos, + "psoBoundsNeg": self.psoBoundsNeg, + "scanrange": self.scanrange, + } + ) + + # Move roughly to start position + yield from self.stubs.send_rpc_and_wait( + "es1_roty", + "configure", + {"velocity": self.scanTra, "acceleration": self.scanTra / self.scanAcc}, + ) + st = yield from self.stubs.send_rpc_and_wait("es1_roty", "move", self.PosStart) + st.wait() + + yield from super().pre_scan() + + def scan_core(self): + print("TOMCAT Sequence scan (via Jinjad AeroScript)") + # Run template execution frm parent + yield from super().scan_core() + + # Collect - Throws a warning due to returning a generator + yield from self.stubs.send_rpc_and_wait("es1_ddaq", "npoints.put", self.scanExpNum) + # st = yield from self.stubs.send_rpc_and_wait("es1_ddaq", "collect") + # st.wait() + diff --git a/tomcat_bec/scans/gigafrost_test.py b/tomcat_bec/scans/gigafrost_test.py new file mode 100644 index 0000000..d0400e0 --- /dev/null +++ b/tomcat_bec/scans/gigafrost_test.py @@ -0,0 +1,93 @@ +import time + +import numpy as np + +from bec_lib import bec_logger +from bec_server.bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase + +logger = bec_logger.logger + + + +class GigaFrostStepScan(AsyncFlyScanBase): + scan_name = "gigafrost_line_scan" + scan_report_hint = "table" + required_kwargs = ["startpos", "ranges"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Performs a step scan with StdDAQ + + Examples: + >>> scans.gigafrost_line_scan(motor='eyex', range=(-5, 5), steps=10, exp_time=20, exp_per=20, exp_burst=10, relative=True) + + """ + super().__init__(parameter=parameter, **kwargs) + self.axis = [] + self.num_pos = 0 + + self.scan_motors = [self.caller_kwargs.get("motor")] + self.scan_range = self.caller_kwargs.get("range") + self.scan_relat = self.caller_kwargs.get("relative", False) + self.scan_steps = int(self.caller_kwargs.get("steps", 10)) + self.scan_exp_t = self.caller_kwargs.get("exp_time", 5) + self.scan_exp_p = self.caller_kwargs.get("exp_per", 10) + self.scan_exp_b = self.caller_kwargs.get("exp_burst", 10) + + if self.scan_steps <=0: + raise RuntimeError(f"Requires at least one scan step") + + def prepare_positions(self): + # Calculate scan start position + if self.scan_relat: + pos = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "read") + self.start_pos = pos[self.scan_motors[0]].get("value") + self.scan_range[0] + else: + self.start_pos = self.scan_range[0] + + # Calculate step size + self.step_size = (self.scan_range[1]-self.scan_range[0])/self.scan_steps + + self.positions = self.start_pos + for ii in range(self.scan_steps): + self.positions.append(self.start_pos + ii*self.step_size) + + self.num_pos = len(self.positions) + + def pre_scan(self): + # Move roughly to start position + st = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "move", self.start_pos) + st.wait() + + yield from self.stubs.pre_scan() + + def stage(self): + yield from self.stubs.send_rpc_and_wait( + "gf2", "configure", {"nimages": self.scan_exp_b, "exposure": self.scan_exp_t, "period": self.scan_exp_p, "roix": 480, "roiy": 128} + ) + yield from self.stubs.send_rpc_and_wait( + "daq", "configure", {"n_images": self.scan_steps * self.scan_exp_b} + ) + yield from super().stage() + + + + def scan_core(self): + for ii in range(self.num_points): + st = yield from self.stubs.send_rpc_and_wait(self.scan_motors[0], "move", self.positions[ii]) + st.wait() + st = yield from self.stubs.send_rpc_and_wait("gf2", "trigger") + st.wait() + self.pointID += 1 + time.sleep(0.2) + + time.sleep(1) + print("Scan done\n\n") + + def cleanup(self): + """Set scan progress to 1 to finish the scan""" + self.num_pos = 1 + return super().cleanup() + +