diff --git a/pxiii_bec/device_configs/x06da_device_config.yaml b/pxiii_bec/device_configs/x06da_device_config.yaml index 74d2a1e..07f543c 100644 --- a/pxiii_bec/device_configs/x06da_device_config.yaml +++ b/pxiii_bec/device_configs/x06da_device_config.yaml @@ -100,7 +100,7 @@ dccm_xbpm: ssxbpm_trx: description: XBPM motion before secondary source deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX'} + deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX1'} onFailure: buffer enabled: true readoutPriority: monitored @@ -109,7 +109,7 @@ ssxbpm_trx: ssxbpm_try: description: XBPM motion before secondary source deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY'} + deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY1'} onFailure: buffer enabled: true readoutPriority: monitored @@ -127,7 +127,7 @@ ssxbpm_try: ssslit_trxr: description: Secondary source blade motion deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSSLH1:TRXR'} + deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXR'} onFailure: buffer enabled: true readoutPriority: monitored @@ -136,7 +136,7 @@ ssslit_trxr: ssslit_trxw: description: Secondary source blade motion deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSSLH1:TRXW'} + deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXW'} onFailure: buffer enabled: true readoutPriority: monitored @@ -145,7 +145,7 @@ ssslit_trxw: ssslit_tryt: description: Secondary source blade motion deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSSLV1:TRYT'} + deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYT'} onFailure: buffer enabled: true readoutPriority: monitored @@ -154,7 +154,7 @@ ssslit_tryt: ssslit_tryb: description: Secondary source blade motion deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSSLV1:TRYB'} + deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYB'} onFailure: buffer enabled: true readoutPriority: monitored @@ -163,7 +163,7 @@ ssslit_tryb: ssxi1_trx: description: Secondary source diagnostic screen motion deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX'} + deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX1'} onFailure: buffer enabled: true readoutPriority: monitored @@ -172,7 +172,7 @@ ssxi1_trx: ssxi1_try: description: Secondary source diagnostic screen motion deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY'} + deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY1'} onFailure: buffer enabled: true readoutPriority: monitored @@ -194,34 +194,34 @@ vfm_trxd: readoutPriority: monitored readOnly: false softwareTrigger: false -vfm_tryuw: - deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'} - onFailure: buffer - enabled: true - readoutPriority: monitored - readOnly: false - softwareTrigger: false -vfm_tryr: - deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'} - onFailure: buffer - enabled: true - readoutPriority: monitored - readOnly: false - softwareTrigger: false -vfm_trydw: - deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'} - onFailure: buffer - enabled: true - readoutPriority: monitored - readOnly: false - softwareTrigger: false +# vfm_tryuw: +# deviceClass: ophyd.EpicsMotor +# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'} +# onFailure: buffer +# enabled: true +# readoutPriority: monitored +# readOnly: false +# softwareTrigger: false +# vfm_tryr: +# deviceClass: ophyd.EpicsMotor +# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'} +# onFailure: buffer +# enabled: true +# readoutPriority: monitored +# readOnly: false +# softwareTrigger: false +# vfm_trydw: +# deviceClass: ophyd.EpicsMotor +# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'} +# onFailure: buffer +# enabled: true +# readoutPriority: monitored +# readOnly: false +# softwareTrigger: false vfm_pitch: description: KB mirror vertical steering deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-VFM:PITCH'} + deviceConfig: {prefix: 'X06DA-ES-VFM:PITCH'} onFailure: buffer enabled: true readoutPriority: monitored @@ -229,7 +229,7 @@ vfm_pitch: softwareTrigger: false vfm_yaw: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-VFM:YAW'} + deviceConfig: {prefix: 'X06DA-ES-VFM:YAW'} enabled: false onFailure: buffer readoutPriority: monitored @@ -237,7 +237,7 @@ vfm_yaw: softwareTrigger: false vfm_roll: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-VFM:ROLL'} + deviceConfig: {prefix: 'X06DA-ES-VFM:ROLL'} enabled: false onFailure: buffer readoutPriority: monitored @@ -245,7 +245,7 @@ vfm_roll: softwareTrigger: false vfm_trx: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-VFM:TRX'} + deviceConfig: {prefix: 'X06DA-ES-VFM:TRX'} enabled: false onFailure: buffer readoutPriority: monitored @@ -253,7 +253,7 @@ vfm_trx: softwareTrigger: false vfm_try: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-VFM:TRY'} + deviceConfig: {prefix: 'X06DA-ES-VFM:TRY'} onFailure: buffer enabled: true readoutPriority: monitored @@ -275,34 +275,34 @@ hfm_trxd: readoutPriority: monitored readOnly: false softwareTrigger: false -hfm_tryur: - deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'} - onFailure: buffer - enabled: true - readoutPriority: monitored - readOnly: false - softwareTrigger: false -hfm_tryw: - deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'} - onFailure: buffer - enabled: true - readoutPriority: monitored - readOnly: false - softwareTrigger: false -hfm_trydr: - deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'} - onFailure: buffer - enabled: true - readoutPriority: monitored - readOnly: false - softwareTrigger: false +# hfm_tryur: +# deviceClass: ophyd.EpicsMotor +# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'} +# onFailure: buffer +# enabled: true +# readoutPriority: monitored +# readOnly: false +# softwareTrigger: false +# hfm_tryw: +# deviceClass: ophyd.EpicsMotor +# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'} +# onFailure: buffer +# enabled: true +# readoutPriority: monitored +# readOnly: false +# softwareTrigger: false +# hfm_trydr: +# deviceClass: ophyd.EpicsMotor +# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'} +# onFailure: buffer +# enabled: true +# readoutPriority: monitored +# readOnly: false +# softwareTrigger: false hfm_pitch: description: KB mirror horizontal steering deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-HFM:PITCH'} + deviceConfig: {prefix: 'X06DA-ES-HFM:PITCH'} enabled: false onFailure: buffer readoutPriority: monitored @@ -310,7 +310,7 @@ hfm_pitch: softwareTrigger: false hfm_yaw: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-HFM:YAW'} + deviceConfig: {prefix: 'X06DA-ES-HFM:YAW'} enabled: false onFailure: buffer readoutPriority: monitored @@ -318,7 +318,7 @@ hfm_yaw: softwareTrigger: false hfm_roll: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-HFM:ROLL'} + deviceConfig: {prefix: 'X06DA-ES-HFM:ROLL'} enabled: false onFailure: buffer readoutPriority: monitored @@ -326,7 +326,7 @@ hfm_roll: softwareTrigger: false hfm_trx: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-HFM:TRX'} + deviceConfig: {prefix: 'X06DA-ES-HFM:TRX'} enabled: false onFailure: buffer readoutPriority: monitored @@ -334,7 +334,7 @@ hfm_trx: softwareTrigger: false hfm_try: deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-ES1-HFM:TRY'} + deviceConfig: {prefix: 'X06DA-ES-HFM:TRY'} onFailure: buffer enabled: true readoutPriority: monitored diff --git a/pxiii_bec/devices/A3200.py b/pxiii_bec/devices/A3200.py index a5e6ddb..686778b 100644 --- a/pxiii_bec/devices/A3200.py +++ b/pxiii_bec/devices/A3200.py @@ -79,11 +79,16 @@ Examples """ import time -from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind +from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind from ophyd.status import SubscriptionStatus +from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PsiDeviceBase +from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin + +try: + from .A3200enums import AbrCmd, AbrMode +except ImportError: + from A3200enums import AbrCmd, AbrMode -from .A3200utils import A3200RasterScanner, A3200Oscillator -from .A3200enums import * try: from bec_lib import bec_logger @@ -92,11 +97,114 @@ except ModuleNotFoundError: import logging logger = logging.getLogger("A3200") + # pylint: disable=logging-fstring-interpolation +class AerotechAbrMixin(CustomDeviceMixin): + """ Configuration class for the Aerotech A3200 controller for the ABR stage""" -class AerotechAbrStage(Device): + + + def on_stage(self): + """ + + NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything. + """ + + logger.warning(f"Configuring {self.parent.scaninfo.scan_msg.info['scan_name']} on ABR") + + d = {} + if self.parent.scaninfo.scan_type in ("measure", "measurement"): + scanargs = self.parent.scaninfo.scan_msg.info['kwargs'] + scanname = self.parent.scaninfo.scan_msg.info['scan_name'] + + if scanname in ("standardscan"): + scan_start = scanargs["start"] + scan_range = scanargs["range"] + scan_move_time = scanargs["move_time"] + scan_ready_rate = scanargs.get("ready_rate", 500) + d["scan_command"] = AbrCmd.MEASURE_STANDARD + d["var_1"] = scan_start + d["var_2"] = scan_range + d["var_3"] = scan_move_time + d["var_4"] = scan_ready_rate + d["var_5"] = 0 + d["var_6"] = 0 + d["var_7"] = 0 + d["var_8"] = 0 + d["var_9"] = 0 + if scanname in ("verticallinescan", "vlinescan"): + scan_exp_time = scanargs["exp_time"] + scan_range_y = scanargs["range"] + scan_steps_y = scanargs["steps"] + d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN + d["var_1"] = scan_exp_time + d["var_2"] = scan_range_y + d["var_3"] = scan_steps_y + d["var_4"] = 0 + d["var_5"] = 0 + d["var_6"] = 0 + d["var_7"] = 0 + d["var_8"] = 0 + d["var_9"] = 0 + if scanname in ("screeningscan"): + scan_start = scanargs["start"] + scan_range = scanargs["range"] + scan_stepnum_o = scanargs["steps"] + scan_exp_time = scanargs["exp_time"] + scan_oscrange = scanargs["oscrange"] + scan_delta = scanargs.get("delta", 0.5) + scan_stepsize_o = scan_range / scan_stepnum_o + d["scan_command"] = AbrCmd.SCREENING + d["var_1"] = scan_start + d["var_2"] = scan_oscrange + d["var_3"] = scan_exp_time + d["var_4"] = scan_stepsize_o + d["var_5"] = scan_stepnum_o + d["var_6"] = scan_delta + d["var_7"] = 0 + d["var_8"] = 0 + d["var_9"] = 0 + if scanname in ("rasterscan", "rastersimplescan"): + scan_exp_time = scanargs["exp_time"] + scan_range_x = scanargs["range_x"] + scan_range_y = scanargs["range_y"] + scan_stepnum_x = scanargs["steps_x"] + scan_stepnum_y = scanargs["steps_y"] + scan_stepsize_x = scan_range_x / scan_stepnum_x + scan_stepsize_y = scan_range_y / scan_stepnum_y + d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE + d["var_1"] = scan_exp_time + d["var_2"] = scan_stepsize_x + d["var_3"] = scan_stepsize_y + d["var_4"] = scan_stepnum_x + d["var_5"] = scan_stepnum_y + d["var_6"] = 0 + d["var_7"] = 0 + d["var_8"] = 0 + d["var_9"] = 0 + + # Reconfigure if got a valid scan config + if len(d)>0: + self.parent.configure(d) + + # Stage the parent + self.parent.bluestage() + + def on_kickoff(self): + """ Kick off parent """ + self.parent.bluekickoff() + + def on_unstage(self): + """ Unstage the ABR controller""" + self.parent.blueunstage() + + + + + +class AerotechAbrStage(PsiDeviceBase): """ Standard PX stage on A3200 controller This is the wrapper class for the standard rotation stage layout for the PX @@ -106,12 +214,15 @@ class AerotechAbrStage(Device): is running as an AeroBasic program on the controller and we communicate to it via 10+1 global variables. """ + custom_prepare_cls = AerotechAbrMixin USER_ACCESS = ['reset', 'kickoff', 'complete'] taskStop = Component( EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted) status = Component( EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted) + clear = Component( + EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted) # Enable/disable motor movement via the IOC (i.e. make it task-only) axisModeLocked = Component( @@ -126,9 +237,6 @@ class AerotechAbrStage(Device): # EpicsSignal, "-PH1:GET", write_pv="-PH1:SET", put_complete=True, kind=Kind.config, # ) - raster = Component(A3200RasterScanner, "", kind=Kind.config) - osc = Component(A3200Oscillator, "", kind=Kind.config) - # Status flags for all axes omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", kind=Kind.normal) gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", kind=Kind.normal) @@ -179,7 +287,7 @@ class AerotechAbrStage(Device): if mode=="direct": self.axisModeDirect.set(37, settle_time=settle_time).wait() if mode=="measuring": - self.axisAxesMode.set(MEASURING_MODE, settle_time=settle_time).wait() + self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait() def configure(self, d: dict) -> tuple: """" Configure the exposure scripts @@ -209,9 +317,8 @@ class AerotechAbrStage(Device): old = self.read_configuration() # ToDo: Check if idle before reconfiguring - - # Set the corresponding global variables self.scan_command.set(d['scan_command']).wait() + # Set the corresponding global variables if "var_1" in d and d['var_1'] is not None: self._var_1.set(d['var_1']).wait() if "var_2" in d and d['var_2'] is not None: @@ -236,6 +343,34 @@ class AerotechAbrStage(Device): new = self.read_configuration() return old, new + def bluestage(self): + """ Bluesky-style stage + + Since configuration synchronization is not guaranteed, this does + nothing. The script launched by kickoff(). + """ + pass + + def bluekickoff(self, timeout=1) -> SubscriptionStatus: + """ Kick off the set program """ + self.start_command.set(1).wait() + + # Define wait until the busy flag goes high + def is_busy(*args, value, **kwargs): + return bool(value==0) + + # Subscribe and wait for update + status = SubscriptionStatus(self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1) + return status + + def blueunstage(self, settle_time=0.1): + """ Stops current script and releases the axes""" + # Disarm commands + self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait() + self.stop_command.set(1).wait() + # Go to direct mode + self.set_axis_mode("direct", settle_time=settle_time) + def complete(self, timeout=None) -> SubscriptionStatus: """ Waits for task execution @@ -262,7 +397,7 @@ class AerotechAbrStage(Device): couple of seconds. """ # Disarm commands - self.scan_command.set(CMD_NONE, settle_time=settle_time).wait() + self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait() self.stop_command.set(1, settle_time=settle_time) # Reload tasks self.taskStop.set(1, settle_time=wait_after_reload).wait() @@ -273,7 +408,8 @@ class AerotechAbrStage(Device): """ Stops current motions """ # Disarm commands - self.scan_command.set(CMD_NONE, settle_time=settle_time).wait() + self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait() + self.stop_command.set(1).wait() # Go to direct mode self.set_axis_mode("direct", settle_time=settle_time) @@ -281,29 +417,29 @@ class AerotechAbrStage(Device): """Checks execution status""" return 0 == self.status.get() - @property - def exp_time(self): - return self.osc.exp_time.get() + # @property + # def exp_time(self): + # return self.osc.exp_time.get() - @exp_time.setter - def exp_time(self, value): - self.osc.etime.set(value).wait() + # @exp_time.setter + # def exp_time(self, value): + # self.osc.etime.set(value).wait() - @property - def start_angle(self): - return self.osc.ostart_pos.get() + # @property + # def start_angle(self): + # return self.osc.ostart_pos.get() - @start_angle.setter - def start_angle(self, value): - self.osc.ostart_pos(value).wait() + # @start_angle.setter + # def start_angle(self, value): + # self.osc.ostart_pos(value).wait() - @property - def measurement_state(self): - return self.osc.phase.get() + # @property + # def measurement_state(self): + # return self.osc.phase.get() - @measurement_state.setter - def measurement_state(self, value): - self.osc.phase.set(value).wait() + # @measurement_state.setter + # def measurement_state(self, value): + # self.osc.phase.set(value).wait() @property def axis_mode(self): @@ -346,16 +482,16 @@ class AerotechAbrStage(Device): self.omega_done.get() and self.gmx_done.get() and self.gmy_done.get() and self.gmz_done.get() ) - def start_exposure(self): - """Starts the previously configured exposure.""" - self.wait_for_movements() - self.osc.taskStart.set(1).wait() - for _ in range(10): - try: - self.osc.wait_status(ABR_BUSY, timeout=1) - except RuntimeWarning as ex: - logger.error(f"{ex} --- trying start again.") - self.osc.kickoff() + # def start_exposure(self): + # """Starts the previously configured exposure.""" + # self.wait_for_movements() + # self.osc.taskStart.set(1).wait() + # for _ in range(10): + # try: + # self.osc.wait_status(ABR_BUSY, timeout=1) + # except RuntimeWarning as ex: + # logger.error(f"{ex} --- trying start again.") + # self.osc.kickoff() if __name__ == "__main__": diff --git a/pxiii_bec/scans/__init__.py b/pxiii_bec/scans/__init__.py index 6d3422d..b874c1d 100644 --- a/pxiii_bec/scans/__init__.py +++ b/pxiii_bec/scans/__init__.py @@ -1,3 +1,2 @@ -from .mx_measurements import (MeasureFastOmega, MeasureStandardWedge, MeasureVerticalLine, - MeasureScanSlits, MeasureRasterSimple, MeasureScreening, MeasureRepeatedOscillation, - MeasureMSOX, MeasureGrid) +from .mx_measurements import (MeasureStandardWedge, MeasureVerticalLine, + MeasureRasterSimple, MeasureScreening) diff --git a/pxiii_bec/scans/mx_measurements.py b/pxiii_bec/scans/mx_measurements.py index dbb4348..d03a9ca 100644 --- a/pxiii_bec/scans/mx_measurements.py +++ b/pxiii_bec/scans/mx_measurements.py @@ -11,9 +11,6 @@ from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase logger = bec_logger.logger -FULL_PERIOD = 0 -HALF_PERIOD = 1 - class AbrCmd: """ Valid Aerotech ABR scan commands from the AeroBasic files""" @@ -22,23 +19,23 @@ class AbrCmd: MEASURE_STANDARD = 2 VERTICAL_LINE_SCAN = 3 SCREENING = 4 - SUPER_FAST_OMEGA = 5 - STILL_WEDGE = 6 # NOTE: Unused - STILLS = 7 # NOTE: Unused - REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused - SINGLE_OSCILLATION = 9 - OLD_FASHIONED = 10 # NOTE: Unused - RASTER_SCAN = 11 - JET_ROTATION = 12 # NOTE: Unused - X_HELICAL = 13 # NOTE: Unused - X_RUNSEQ = 14 # NOTE: Unused - JUNGFRAU = 15 - MSOX = 16 # NOTE: Unused - SLIT_SCAN = 17 # NOTE: Unused - RASTER_SCAN_STILL = 18 - SCAN_SASTT = 19 - SCAN_SASTT_V2 = 20 - SCAN_SASTT_V3 = 21 + # SUPER_FAST_OMEGA = 5 # Some Japanese wanted to measure samples in capilaries at high RPMs + # STILL_WEDGE = 6 # NOTE: Unused Step scan + # STILLS = 7 # NOTE: Unused Just send triggers to detector + # REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused + # SINGLE_OSCILLATION = 9 + # OLD_FASHIONED = 10 # NOTE: Unused + # RASTER_SCAN = 11 + # JET_ROTATION = 12 # NOTE: Unused + # X_HELICAL = 13 # NOTE: Unused + # X_RUNSEQ = 14 # NOTE: Unused + # JUNGFRAU = 15 + # MSOX = 16 # NOTE: Unused + # SLIT_SCAN = 17 # NOTE: Unused + # RASTER_SCAN_STILL = 18 + # SCAN_SASTT = 19 + # SCAN_SASTT_V2 = 20 + # SCAN_SASTT_V3 = 21 @@ -59,13 +56,12 @@ class AerotechFlyscanBase(AsyncFlyScanBase): abr_complete : bool Wait for the launched ABR task to complete. """ + scan_type = "measure" scan_report_hint = "table" arg_input = {} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} # Aerotech stage config - abr_command = None - abr_globals = {} abr_raster_reset =False abr_complete = False abr_timeout = None @@ -77,12 +73,6 @@ class AerotechFlyscanBase(AsyncFlyScanBase): parameters. """ super().__init__(parameter=parameter, **kwargs) - - # Override if explicitly passed as parameter - if "abr_command" in self.caller_kwargs: - self.abr_command = self.caller_kwargs.get("abr_command") - if "abr_globals" in self.caller_kwargs: - self.abr_globals = self.caller_kwargs.get("abr_globals") if "abr_raster_reset" in self.caller_kwargs: self.abr_raster_reset = self.caller_kwargs.get("abr_raster_reset") if "abr_complete" in self.caller_kwargs: @@ -91,6 +81,7 @@ class AerotechFlyscanBase(AsyncFlyScanBase): self.abr_timeout = self.caller_kwargs.get("abr_timeout") def pre_scan(self): + """ Mostly just checking if ABR stage is ok...""" # TODO: Move roughly to start position??? # ABR status checking @@ -107,13 +98,6 @@ class AerotechFlyscanBase(AsyncFlyScanBase): if self.abr_raster_reset: yield from self.stubs.send_rpc_and_wait("abr", "raster_scan_done.set", 0) - # Configure the global variables - d = {"scan_command" : self.abr_command} - d.update(self.abr_globals) - logger.info(d) - # Configure - yield from self.stubs.send_rpc_and_wait("abr", "configure", d) - # Call super yield from super().pre_scan() @@ -125,7 +109,7 @@ class AerotechFlyscanBase(AsyncFlyScanBase): """ The actual scan logic comes here. """ # Kick off the run - yield from self.stubs.send_rpc_and_wait("abr", "start_command.set", 1) + yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff") logger.info("Measurement launched on the ABR stage...") # Wait for grid scanner to finish @@ -168,28 +152,9 @@ class MeasureStandardWedge(AerotechFlyscanBase): ready_rate : float, optional No clue what is this... (default=500) """ - scan_name = "standard_wedge" + scan_name = "standardscan" required_kwargs = ["start", "range", "move_time"] - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.omega'] - # Set parameters from meaningful inputs - self.scan_start = parameter['kwargs'].get("start") - self.scan_range = parameter['kwargs'].get("range") - self.scan_move_time = parameter['kwargs'].get("move_time") - self.scan_ready_rate = parameter['kwargs'].get("ready_rate", 500) - self.abr_command = AbrCmd.MEASURE_STANDARD - self.abr_globals = { - "var_1" : self.scan_start, - "var_2" : self.scan_range, - "var_3" : self.scan_move_time, - "var_4" : self.scan_ready_rate, - } - logger.info(self.abr_globals) - # Call base class - super().__init__(parameter=parameter, **kwargs) - - class MeasureVerticalLine(AerotechFlyscanBase): @@ -208,75 +173,15 @@ class MeasureVerticalLine(AerotechFlyscanBase): Parameters ---------- - range_y : float + range : float Step size [mm]. - steps_y : int + steps : int Scan range of the axis. exp_time : float Eeffective exposure time per step [s] """ - scan_name = "vertical_line" - required_kwargs = ["exp_time", "range_y", "steps_y"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.gmy'] - # Set parameters from meaningful inputs - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_range_y = parameter['kwargs'].get("range_y") - self.scan_stepnum_y = parameter['kwargs'].get("steps_y") - self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y - - self.abr_command = AbrCmd.VERTICAL_LINE_SCAN - self.abr_globals = { - 'var_1': self.scan_exp_time, - 'var_2': self.scan_stepsize_y, - 'var_3': self.scan_stepnum_y, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureScanSlits(AerotechFlyscanBase): - """ Coordinated scan - - Measure a hardware coordinated relative line scan with the X- and Y axes. - This is used to accurately track solid supports with long linear grooves, - as theese might be slightly rotated. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.slit_scan(range=[10, 0.3], scan_time=20) - - Parameters - ---------- - range_x : float - Move distance in X [mm]. - range_y : float - Move distance in Y [mm]. - velocity : float - Effective movement velocity [mm/s]. - """ - scan_name = "slit_scan" - required_kwargs = ["range", "velocity"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ["abr.gmx", "abr.gmy"] - # Set parameters from meaningful inputs - self.abr_command = AbrCmd.SLIT_SCAN - self.scan_range_x = parameter['kwargs'].get("range_x") - self.scan_range_y = parameter['kwargs'].get("range_y") - self.scan_velocity = parameter['kwargs'].get("velocity") - self.abr_globals = { - "var_1" : self.scan_range_x, - "var_2" : self.scan_range_y, - "var_3" : self.scan_velocity, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) + scan_name = "vlinescan" + required_kwargs = ["exp_time", "range", "steps"] @@ -308,120 +213,9 @@ class MeasureRasterSimple(AerotechFlyscanBase): steps_y : int Number of scan steps in Y (slow). """ - scan_name = "raster_simple" + scan_name = "rasterscan" required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"] - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.gmx', 'abr.gmy'] - # Set parameters from meaningful inputs - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_range_x = parameter['kwargs'].get("range_x") - self.scan_range_y = parameter['kwargs'].get("range_y") - self.scan_stepnum_x = parameter['kwargs'].get("steps_x") - self.scan_stepnum_y = parameter['kwargs'].get("steps_y") - self.scan_stepsize_x = self.scan_range_x / self.scan_stepnum_x - self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y - - self.abr_command = AbrCmd.RASTER_SCAN_SIMPLE - self.abr_raster_reset = True - self.abr_globals = { - "var_1" : self.scan_exp_time, - "var_2" : self.scan_stepsize_x, - "var_3" : self.scan_stepsize_y, - "var_4" : self.scan_stepnum_x, - "var_5" : self.scan_stepnum_y, - "var_6" : 0, - "var_7" : 0, - "var_8" : 0, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureSASTT(AerotechFlyscanBase): - """ Small angle scanning tensor tomography scan - - Measure a single projection for Small Angle Scanning Tensor Tomography. - It's essentially a separate grid scan, that can be modified independently. - The scan is relative and assumes the goniometer is currently at the CENTER - of the first cell (i.e. TOP-LEFT). Each line is executed as a single - continous movement, there's no actual stepping in the X direction. - - NOTE: Not all beamlines have all SASTT modes implemented. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_sastt(exp_time=0.01, range_x=1, range_y=1, steps_x=200, steps_y=200) - - Parameters - ---------- - exp_time : float - Effective exposure time for each cell along the X axis [s]. - range_x : float - Scan step size [mm]. - range_y : float - Scan step size [mm]. - steps_x : int - Number of scan steps in X (fast). Only used for velocity calculation. - steps_y : int - Number of scan steps in Y (slow). - odd_tweak : (float) - Distance to be added before the open shutter command [mm]. - Used only in scan version=3. Should be small (default: 0)! - even_tweak : (float) - Distance to be added before the open shutter command [mm]. - Used only in scan version=3. Should be small (default: 0)! - version : (int) - Scanning mode for tensor tomo. - 1 = original snake scan, single PSO window - 2 = scan always from LEFT---RIGHT - 3 = snake scan alternating PSO window for even/odd rows - """ - scan_name = "measure_sastt" - required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.gmx', 'abr.gmy'] - # Set parameters from meaningful inputs - self.scan_version = int(parameter['kwargs'].get("version", 1)) - if self.scan_version==1: - self.abr_command = AbrCmd.SCAN_SASTT - if self.scan_version==2: - self.abr_command = AbrCmd.SCAN_SASTT_V2 - if self.scan_version==3: - self.abr_command = AbrCmd.SCAN_SASTT_V3 - else: - raise RuntimeError(f"Non existing SAS-TT scan mode: {self.scan_version}") - - # Set parameters from meaningful inputs - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_range_x = parameter['kwargs'].get("range_x") - self.scan_range_y = parameter['kwargs'].get("range_y") - self.scan_stepnum_x = parameter['kwargs'].get("steps_x") - self.scan_stepnum_y = parameter['kwargs'].get("steps_y") - self.scan_even_tweak = parameter['kwargs'].get("even_tweak", 0) - self.scan_odd_tweak = parameter['kwargs'].get("odd_tweak", 0) - self.scan_stepsize_x = self.scan_range_x / self.scan_stepnum_x - self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y - - self.abr_globals = { - 'var_1': self.scan_exp_time, - 'var_2': self.scan_stepsize_x, - 'var_3': self.scan_stepsize_y, - 'var_4': self.scan_stepnum_x, - 'var_5': self.scan_stepnum_y, - 'var_6': self.scan_even_tweak, - 'var_7': self.scan_odd_tweak, - 'var_8': 0, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - @@ -454,588 +248,5 @@ class MeasureScreening(AerotechFlyscanBase): delta : float Safety margin for sub-range movements (default: 0.5) [deg]. """ - scan_name = "measure_screening" + scan_name = "screeningscan" required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['gmy'] - # Set parameters from meaningful inputs - self.scan_start = parameter['kwargs'].get("start") - self.scan_range_o = parameter['kwargs'].get("range") - self.scan_stepnum_o = parameter['kwargs'].get("steps") - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_oscrange = parameter['kwargs'].get("oscrange") - self.scan_delta = parameter['kwargs'].get("delta", 0.5) - self.scan_stepsize_o = self.scan_range_o / self.scan_stepnum_o - self.abr_command = AbrCmd.SCREENING - self.abr_globals = { - "var_1" : self.scan_start, - "var_2" : self.scan_oscrange, - "var_3" : self.scan_exp_time, - "var_4" : self.scan_stepsize_o, - "var_5" : self.scan_stepnum_o, - "var_6" : self.scan_delta, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureFastOmega(AerotechFlyscanBase): - """ Fast omega scan - - Performs a fast rotational scan with the rotation stage (omega). It ramps - up to constant speed and sets off PSO for the expected travel time. I.e. - not really a PSO output (as it ignores acceleration distance) but it's - expected to trigger at-speed for 'range' distance. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_fast_omega(start=42, range=180, move_time=1.0) - - Parameters - ---------- - start : float - Scan start position of the axis [deg]. - range : float - At-speed range of the axis relative to 'start' + acceleration - distance [deg]. - move_time : float - Total time for the at-speed movement and trigger [s]. - rate : (???) - (default: 200). - """ - scan_name = "measure_fast_omega" - required_kwargs = ["start", "range", "move_time"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['omega'] - # Set parameters from meaningful inputs - logger.info(parameter) - self.scan_start = parameter['kwargs'].get("start") - self.scan_range = parameter['kwargs'].get("range") - self.scan_move_time = parameter['kwargs'].get("move_time") - self.scan_acceleration = parameter['kwargs'].get("acceleration", 200) - self.abr_command = AbrCmd.SUPER_FAST_OMEGA - self.abr_globals = { - "var_1" : self.scan_start, - "var_2" : self.scan_range, - "var_3" : self.scan_move_time, - "var_4" : self.scan_acceleration, - } - logger.info(self.abr_globals) - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureStillWedge(AerotechFlyscanBase): - """ Still wedge scan - - Measure a simple step scan with the omega stage with PSO triggering. - The scan is similar to the continous wedge scan. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_still_wedge(start=42, range=180, exp_time=0.1, steps=60) - - Parameters - ---------- - start : (float) - Scan start position of the axis [mm]. - range : (float) - Scan range of the omega axis [mm]. - exp_time : (float) - Exposure time per point [s]. - steps : int - Number of actual steps. - sleep_after_shutter_close : (float) - [s] (default: 0.01). - """ - scan_name = "measure_still_wedge" - required_kwargs = ["start", "range", "exp_time", "steps"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.omega'] - # Set parameters from meaningful inputs - self.scan_start = parameter['kwargs'].get("start") - self.scan_range = parameter['kwargs'].get("range") - self.scan_stepnum_o = parameter['kwargs'].get("steps") - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_shutter_sleep = parameter['kwargs'].get("sleep_after_shutter_close", 0.01) - self.scan_stepsize_o = self.scan_stepnum_o / self.scan_stepnum_o - self.abr_command = AbrCmd.STILL_WEDGE - self.abr_globals = { - "var_1" : self.scan_start, - "var_2" : self.scan_range, - "var_3" : self.scan_exp_time, - "var_4" : self.scan_stepsize_o, - "var_5" : self.scan_shutter_sleep, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureStills(AerotechFlyscanBase): - """ Still image sequence scan - - Measures a series of PSO triggered images without any motor movement. - NOTE: Use idle_time=0.0 to prevent shutter open/close. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_stills(steps=100, exp_time=0.1, idle_time=3) - - Parameters - ---------- - steps : int - Total number of frames to be recorded. - exp_time : float - Exposure time of each frame [s]. - idle_time : float - Sleep time between the frames [s]. - """ - scan_name = "measure_stills" - required_kwargs = ["exp_time", "steps", "idle_time"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = [] - # Set parameters from meaningful inputs - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_numsteps = parameter['kwargs'].get("steps") - self.scan_idle_time = parameter['kwargs'].get("idle_time") - self.abr_command = AbrCmd.STILLS - self.abr_globals = { - "var_1" : self.scan_numsteps, - "var_2" : self.scan_exp_time, - "var_3" : self.scan_idle_time, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureSingleOscillation(AerotechFlyscanBase): - """ Single oscillation scan - - Short line scan with the omega axis with PSO trigger. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_single_osc(start=42, range=180, move_time=20) - - Parameters - ---------- - start : float - Scan start position of the omegaaxis [mm]. - range : float - Oscillation range of the axis [mm]. - move_time : float - Total scan time for the movement [s]. - delta : (???) - Safety margin for movement (default: 0). - settle : (???) - (default: 0.5). - """ - scan_name = "measure_single_osc" - required_kwargs = ["start", "range", "move_time"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.omega'] - # Set parameters from meaningful inputs - self.scan_start = parameter['kwargs'].get("start") - self.scan_range = parameter['kwargs'].get("range") - self.scan_move_time = parameter['kwargs'].get("move_time") - self.scan_delta = parameter['kwargs'].get("delta", 0) - self.scan_settle = parameter['kwargs'].get("settle", 0.5) - self.abr_command = AbrCmd.SINGLE_OSCILLATION - self.abr_globals = { - "var_1" : self.scan_start, - "var_2" : self.scan_range, - "var_3" : self.scan_move_time, - "var_4" : self.scan_delta, - "var_5" : self.scan_settle, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureRepeatedOscillation(AerotechFlyscanBase): - """ Repeated oscillation scan - - Repeated relative line scans with the omega axis with PSO enable trigger. - The lines are performed in zigzag mode with PSO triggering. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_multi_osc(steps=50, range=30, move_time=3) - - Parameters - ---------- - steps : int - Number of cycles to repeat the scan - move_time : float - Total scan time for the movement [s]. - range : float - Relative oscillation range of the omega axis [deg]. - settle : (???) - Movement settle time after each line (default: 0) [s]. - delta : (???) - Safety margin for movement (default: 0.5) [deg]. - """ - scan_name = "measure_multi_osc" - required_kwargs = ["steps", "range", "scan_time"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['omega'] - # Set parameters from meaningful inputs - self.scan_stepnum = parameter['kwargs'].get("steps") - self.scan_range = parameter['kwargs'].get("range") - self.scan_move_time = parameter['kwargs'].get("move_time") - self.scan_sleep = parameter['kwargs'].get("sleep", 0) - self.scan_delta = parameter['kwargs'].get("delta", 0.5) - self.abr_command = AbrCmd.REPEAT_SINGLE_OSCILLATION - self.abr_globals = { - "var_1" : self.scan_stepnum, - "var_2" : self.scan_move_time, - "var_3" : self.scan_range, - "var_4" : self.scan_sleep, - "var_5" : self.scan_delta, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureMSOX(AerotechFlyscanBase): - """ Standard MSOX scan - - MSOX experiment for Florian, i.e. a glorified oscillation scan this time - with absolute positions. The lines are unidirectional with PSO triggering. - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_msox(start=42, range=30, move_time=3.0) - - Parameters - ---------- - start : float - Scan start position of the axis. - range : float - Scan range of the axis. - move_time : float - Exposure time for each point [s]. - steps : int - Number of repeats. - settle_time : float - Settle time before line start (default=0) [s]. - """ - scan_name = "measure_msox" - required_kwargs = ["start", "range", "move_time"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.omega'] - # Set parameters from meaningful inputs - self.scan_start = parameter['kwargs'].get("start") - self.scan_range = parameter['kwargs'].get("range") - self.scan_move_time = parameter['kwargs'].get("move_time") - self.scan_stepnum = parameter['kwargs'].get("steps", 1) - self.scan_settle_time = parameter['kwargs'].get("settle_time", 0) - self.abr_command = AbrCmd.MSOX - self.abr_globals = { - "var_1" : self.scan_start, - "var_2" : self.scan_range, - "var_3" : self.scan_move_time, - "var_4" : self.scan_stepnum, - "var_5" : self.scan_settle_time, - "var_6" : 0, - "var_7" : 0, - "var_8" : 0, - "var_9" : 0, - "var_10" : 0, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - - -class MeasureGrid(AerotechFlyscanBase): - """ General grid scan - - Performs an X-Y-Omega coordinated grid scan: - X axis is absolute (fast axis) - Y axis is relative to start - Omega starts at the current value and is synchronized to X - - Note: This was probably never used in its current form, because the - code had an undefined variable 'self.position' - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_raster(start=42, range=10, exp_time=0.1) - - Parameters - ---------- - positions : 2D-array - Scan position of each axis, i.e. (N, 3) shaped array. - steps_x : int - Number of points along the X axis (fast). - steps_y : int - Number of points in the Y axis (slow). - angle: - Triggered angular range on the synchronized omega [deg] - exp_time : (float) - Exposure time for each point, only used for number of points and velocity [s]. - """ - scan_name = "measure_raster" - required_kwargs = ["positions", "ncols", "angle", "exp_time"] - - # Default values - scan_start_x = None - scan_end_x = None - scan_xstep = 0.010 - scan_ystep = 0.010 - scan_zstep = 0.010 - scan_gmy_step = 0.010 - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.omega', 'abr.gmx', 'abr.gmy'] - # Set parameters from meaningful inputs - self.abr_command = AbrCmd.RASTER_SCAN - self.scan_positions = parameter['kwargs'].get("positions") - self.scan_stepnum_x = parameter['kwargs'].get("steps_x") - self.scan_stepnum_y = parameter['kwargs'].get("steps_y") - self.scan_anglerange = parameter['kwargs'].get("angle") - self.scan_exp_time = parameter['kwargs'].get("exp_time") - - if len(self.scan_positions) != self.scan_stepnum_x * self.scan_stepnum_y: - raise RuntimeError("Number of points and positions do not match") - - if len(self.scan_positions)==1: - raise RuntimeWarning("Raster scan with one cell makes no sense") - - # Call base class - super().__init__(parameter=parameter, **kwargs) - - def prepare_positions(self): - # Calculate step sizes - pos_start = self.scan_positions[0] # first cell on first row - pos_col_2nd = self.scan_positions[1] # second cell on first row - pos_col_end = self.scan_positions[self.scan_stepnum_x-1] # last cell on first row - - self.scan_xstep = pos_col_2nd[0] - pos_start[0] - half_cell = abs(self.scan_xstep) / 2.0 - self.scan_start_x = pos_start[0] - half_cell - self.scan_end_x = pos_col_end[0] + half_cell - - if self.scan_stepnum_y > 1: - pos_row_2nd = self.scan_positions[self.scan_stepnum_x] # first cell on second row - self.scan_ystep = pos_row_2nd[1] - pos_start[1] - self.scan_zstep = pos_row_2nd[2] - pos_start[2] - self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep]) - - # ToDo: Check with Zac what are theese parameters - self.scan_start_o = yield from self.stubs.send_rpc_and_wait("abs", "omega.readback.get") - self.scan_gmx_offset = yield from self.stubs.send_rpc_and_wait("abs", "gmx.readback.get") - - # Configure the global variables - self.abr_globals = { - "var_1" : self.scan_start_o, - "var_2" : self.scan_start_x, - "var_3" : self.scan_end_x, - "var_4" : self.scan_stepnum_x, - "var_5" : self.scan_stepnum_y, - "var_6" : self.scan_anglerange, - "var_7" : self.scan_exp_time, - "var_8" : HALF_PERIOD, - "var_9" : self.scan_gmx_offset, - "var_10" : self.scan_gmy_step, - } - - - -class MeasureGridStill(AerotechFlyscanBase): - """ Still grid scan - - Performs an X-Y-Omega coordinated grid scan in stepping mode: - X axis is absolute - Y axis is relative to start - Omega starts at the current value and is synchronized to X - - Note: This was probably never used in its current form, because the - code had an undefined variable 'self.position' - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> pts = [[11, 2, 3.4], [11, 2, 3.5], ...] - >>> scans.measure_raster(positions=pts, steps_x=20, steps_y=20, exp_time=0.1, delay=0.05) - - Parameters - ---------- - positions : 2D-array - Scan position of each axis, i.e. (N, 3) shaped array. - columns : int - Nmber of columns. - angle: - ??? - exp_time : float - Exposure time for each point [s]. - delay: - ??? - """ - scan_name = "measure_raster_still" - required_kwargs = ["positions", "steps_x", "steps_y", "exp_time", "delay"] - - # Default values - scan_start_x = None - scan_end_x = None - scan_xstep = 0.010 - scan_ystep = 0.010 - scan_zstep = 0.010 - scan_gmy_step = 0.010 - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['abr.gmx', 'abr.gmy', 'abr.omega'] - # Set parameters from meaningful inputs - self.abr_command = AbrCmd.RASTER_SCAN_STILL - self.scan_positions = parameter['kwargs'].get("positions") - self.scan_stepnum_x = parameter['kwargs'].get("steps_x") - self.scan_stepnum_y = parameter['kwargs'].get("steps_y") - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_delay = parameter['kwargs'].get("delay") - - if len(self.scan_positions) != self.scan_stepnum_x * self.scan_stepnum_y: - raise RuntimeError("Number of points and positions do not match") - - if len(self.scan_positions)==1: - raise RuntimeWarning("Raster scan with one cell makes no sense") - # Call base class - super().__init__(parameter=parameter, **kwargs) - - def prepare_positions(self): - # Calculate step sizes - pos_start = self.scan_positions[0] # first cell on first row - pos_col_2nd = self.scan_positions[1] # second cell on first row - pos_col_end = self.scan_positions[self.scan_stepnum_x-1] # last cell on first row - - self.scan_xstep = pos_col_2nd[0] - pos_start[0] - half_cell = abs(self.scan_xstep) / 2.0 - self.scan_start_x = pos_start[0] - half_cell - self.scan_end_x = pos_col_end[0] + half_cell - - if self.scan_stepnum_y > 1: - pos_row_2nd = self.scan_positions[self.scan_stepnum_x] # first cell on second row - self.scan_ystep = pos_row_2nd[1] - pos_start[1] - self.scan_zstep = pos_row_2nd[2] - pos_start[2] - self.scan_gmy_step = np.linalg.norm([self.scan_ystep, self.scan_zstep]) - - # ToDo: Check with Zac what are theese parameters - self.scan_omega_position = yield from self.stubs.send_rpc_and_wait("abs", "omega.readback.get") - self.scan_gmx_offset = yield from self.stubs.send_rpc_and_wait("abs", "gmx.readback.get") - - self.abr_globals = { - "var_1" : self.scan_omega_position, - "var_2" : self.scan_start_x, - "var_3" : self.scan_end_x, - "var_4" : self.scan_stepnum_x, - "var_5" : self.scan_stepnum_y, - "var_6" : self.scan_delay, - "var_7" : self.scan_exp_time, - "var_8" : HALF_PERIOD, - "var_9" : self.scan_gmx_offset, - "var_10" : self.scan_gmy_step, - } - - - -class MeasureJungfrau(AerotechFlyscanBase): - """ Scan with the Jungfrau detector - - The scan itself is executed by the scan service running on the Aerotech - controller. Ophyd just configures, launches it and waits for completion. - - Example - ------- - >>> scans.measure_jungfrau(start=42, range=180, scan_time=20) - - Parameters - ---------- - steps_x : int - ??? - steps_y : int - ??? - range_x : int - ??? - range_y : int - ??? - exp_time : float - Exposure time per point [s]. - sleep : float - (default: 0.1). - """ - scan_name = "measure_jungfrau" - required_kwargs = ["steps_x", "steps_y", "range_x", "range_y", "exp_time"] - - def __init__(self, *args, parameter: dict = None, **kwargs): - self.axis = ['omega'] - # Set parameters from meaningful inputs - self.scan_stepnum_x = parameter['kwargs'].get("steps_x") - self.scan_stepnum_y = parameter['kwargs'].get("steps_y") - self.scan_range_x = parameter['kwargs'].get("range_x") - self.scan_range_y = parameter['kwargs'].get("range_y") - self.scan_stepsize_x = self.scan_range_x / self.scan_stepnum_x - self.scan_stepsize_y = self.scan_range_y / self.scan_stepnum_y - self.scan_exp_time = parameter['kwargs'].get("exp_time") - self.scan_sleep = parameter['kwargs'].get("sleep", 0.1) - sleep_time = 5.0 + (self.scan_stepnum_y * self.scan_sleep) + ( - self.scan_stepnum_x * self.scan_stepnum_y * self.scan_exp_time) - self.abr_timeout = None if self.scan_sleep <= 0 else sleep_time - self.abr_command = AbrCmd.JUNGFRAU - self.abr_complete = True - self.abr_globals = { - "var_1" : self.scan_stepnum_x, - "var_2" : self.scan_stepnum_y, - "var_3" : self.scan_stepsize_x, - "var_4" : self.scan_stepsize_y, - "var_5" : self.scan_exp_time, - "var_6" : self.scan_sleep, - "var_7" : 0, - "var_8" : 0, - } - # Call base class - super().__init__(parameter=parameter, **kwargs) - - def scan_core(self): - """ The actual scan logic comes here. - """ - yield from super().scan_core() - - # Go back to direct mode - st = yield from self.stubs.send_rpc_and_wait("abr", "set_axis_mode", "direct") - st.wait() \ No newline at end of file