diff --git a/pxiii_bec/devices/A3200.py b/pxiii_bec/devices/A3200.py index f02ed7d..c305ef9 100644 --- a/pxiii_bec/devices/A3200.py +++ b/pxiii_bec/devices/A3200.py @@ -78,11 +78,14 @@ Examples abr.stop() # this function only returns after the STATUS is back to OK """ + import time from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind from ophyd.status import SubscriptionStatus from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PsiDeviceBase -from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin as CustomDeviceMixin, +) try: from .A3200enums import AbrCmd, AbrMode @@ -92,23 +95,21 @@ except ImportError: try: from bec_lib import bec_logger + logger = bec_logger.logger except ModuleNotFoundError: import logging + logger = logging.getLogger("A3200") # pylint: disable=logging-fstring-interpolation class AerotechAbrMixin(CustomDeviceMixin): - """ Configuration class for the Aerotech A3200 controller for the ABR stage""" - - - - + """Configuration class for the Aerotech A3200 controller for the ABR stage""" def on_stage(self): """ - + NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything. """ @@ -116,8 +117,8 @@ class AerotechAbrMixin(CustomDeviceMixin): d = {} if self.parent.scaninfo.scan_type in ("measure", "measurement", "fly"): - scanargs = self.parent.scaninfo.scan_msg.info['kwargs'] - scanname = self.parent.scaninfo.scan_msg.info['scan_name'] + scanargs = self.parent.scaninfo.scan_msg.info["kwargs"] + scanname = self.parent.scaninfo.scan_msg.info["scan_name"] if scanname in ("standardscan"): scan_start = scanargs["start"] @@ -186,27 +187,24 @@ class AerotechAbrMixin(CustomDeviceMixin): d["var_9"] = 0 # Reconfigure if got a valid scan config - if len(d)>0: + if len(d) > 0: self.parent.configure(d) # Stage the parent self.parent.bluestage() def on_kickoff(self): - """ Kick off parent """ + """Kick off parent""" self.parent.bluekickoff() - + def on_unstage(self): - """ Unstage the ABR controller""" + """Unstage the ABR controller""" self.parent.blueunstage() - - - class AerotechAbrStage(PsiDeviceBase): - """ Standard PX stage on A3200 controller - + """Standard PX stage on A3200 controller + This is the wrapper class for the standard rotation stage layout for the PX beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and the associated motion axes GMX, GMY and GMZ. The ophyd class associates to @@ -214,23 +212,20 @@ class AerotechAbrStage(PsiDeviceBase): is running as an AeroBasic program on the controller and we communicate to it via 10+1 global variables. """ - custom_prepare_cls = AerotechAbrMixin - USER_ACCESS = ['reset', 'kickoff', 'complete'] - taskStop = Component( - EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted) - status = Component( - EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted) - clear = Component( - EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted) + custom_prepare_cls = AerotechAbrMixin + USER_ACCESS = ["reset", "kickoff", "complete"] + + taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted) + status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted) + clear = Component(EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted) # Enable/disable motor movement via the IOC (i.e. make it task-only) - axisModeLocked = Component( - EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted) + axisModeLocked = Component(EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted) axisModeDirect = Component( - EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted) - axisAxesMode = Component( - EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted) + EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted + ) + axisAxesMode = Component(EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted) # Shutter box is missing readback so the -GET signal is installed on the VME # _shutter = Component( @@ -244,12 +239,13 @@ class AerotechAbrStage(PsiDeviceBase): gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal) # For some reason the task interface is called PSO... - scan_command = Component( - EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted) + scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted) start_command = Component( - EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted) + EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted + ) stop_command = Component( - EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted) + EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted + ) # Global variables to controll AeroBasic scripts _var_1 = Component(EpicsSignal, "-PSO:VAR-1", put_complete=True, kind=Kind.omitted) @@ -274,23 +270,23 @@ class AerotechAbrStage(PsiDeviceBase): raster_num_rows = Component(EpicsSignal, "-GRD:ROW-DONE", kind=Kind.config) def set_axis_mode(self, mode: str, settle_time=0.1) -> None: - """ Set axis mode to direct/measurement mode. + """Set axis mode to direct/measurement mode. Measurement ensures that the scrips run undisturbed by blocking axis - motion commands from the IOC (i.e. internal only). - + motion commands from the IOC (i.e. internal only). + Parameters: ----------- mode : str Valid values are 'direct' and 'measuring'. """ - if mode=="direct": + if mode == "direct": self.axisModeDirect.set(37, settle_time=settle_time).wait() - if mode=="measuring": + if mode == "measuring": self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait() def configure(self, d: dict) -> tuple: - """" Configure the exposure scripts + """ " Configure the exposure scripts Script execution at the PX beamlines is based on scripts that are always running on the controller that execute commands when commanded by @@ -317,54 +313,56 @@ class AerotechAbrStage(PsiDeviceBase): old = self.read_configuration() # ToDo: Check if idle before reconfiguring - self.scan_command.set(d['scan_command']).wait() + self.scan_command.set(d["scan_command"]).wait() # Set the corresponding global variables - if "var_1" in d and d['var_1'] is not None: - self._var_1.set(d['var_1']).wait() - if "var_2" in d and d['var_2'] is not None: - self._var_2.set(d['var_2']).wait() - if "var_3" in d and d['var_3'] is not None: - self._var_3.set(d['var_3']).wait() - if "var_4" in d and d['var_4'] is not None: - self._var_4.set(d['var_4']).wait() - if "var_5" in d and d['var_5'] is not None: - self._var_5.set(d['var_5']).wait() - if "var_6" in d and d['var_6'] is not None: - self._var_6.set(d['var_6']).wait() - if "var_7" in d and d['var_7'] is not None: - self._var_7.set(d['var_7']).wait() - if "var_8" in d and d['var_8'] is not None: - self._var_8.set(d['var_8']).wait() - if "var_9" in d and d['var_9'] is not None: - self._var_9.set(d['var_9']).wait() - if "var_10" in d and d['var_10'] is not None: - self._var_10.set(d['var_10']).wait() + if "var_1" in d and d["var_1"] is not None: + self._var_1.set(d["var_1"]).wait() + if "var_2" in d and d["var_2"] is not None: + self._var_2.set(d["var_2"]).wait() + if "var_3" in d and d["var_3"] is not None: + self._var_3.set(d["var_3"]).wait() + if "var_4" in d and d["var_4"] is not None: + self._var_4.set(d["var_4"]).wait() + if "var_5" in d and d["var_5"] is not None: + self._var_5.set(d["var_5"]).wait() + if "var_6" in d and d["var_6"] is not None: + self._var_6.set(d["var_6"]).wait() + if "var_7" in d and d["var_7"] is not None: + self._var_7.set(d["var_7"]).wait() + if "var_8" in d and d["var_8"] is not None: + self._var_8.set(d["var_8"]).wait() + if "var_9" in d and d["var_9"] is not None: + self._var_9.set(d["var_9"]).wait() + if "var_10" in d and d["var_10"] is not None: + self._var_10.set(d["var_10"]).wait() new = self.read_configuration() return old, new def bluestage(self): - """ Bluesky-style stage - - Since configuration synchronization is not guaranteed, this does - nothing. The script launched by kickoff(). + """Bluesky-style stage + + Since configuration synchronization is not guaranteed, this does + nothing. The script launched by kickoff(). """ pass def bluekickoff(self, timeout=1) -> SubscriptionStatus: - """ Kick off the set program """ + """Kick off the set program""" self.start_command.set(1).wait() # Define wait until the busy flag goes high def is_busy(*args, value, **kwargs): - return bool(value==0) + return bool(value == 0) # Subscribe and wait for update - status = SubscriptionStatus(self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1) + status = SubscriptionStatus( + self.raster_scan_done, is_busy, timeout=timeout, settle_time=0.1 + ) return status def blueunstage(self, settle_time=0.1): - """ Stops current script and releases the axes""" + """Stops current script and releases the axes""" # Disarm commands self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait() self.stop_command.set(1).wait() @@ -372,22 +370,25 @@ class AerotechAbrStage(PsiDeviceBase): self.set_axis_mode("direct", settle_time=settle_time) def complete(self, timeout=None) -> SubscriptionStatus: - """ Waits for task execution + """Waits for task execution NOTE: Original complete was raster scanner complete... """ + # Define wait until the busy flag goes down (excluding initial update) def is_idle(*args, value, **kwargs): - return bool(value==1) + return bool(value == 1) # Subscribe and wait for update # status = SubscriptionStatus(self.task1, is_idle, timeout=timeout, settle_time=0.5) - status = SubscriptionStatus(self.raster_scan_done, is_idle, timeout=timeout, settle_time=0.5) + status = SubscriptionStatus( + self.raster_scan_done, is_idle, timeout=timeout, settle_time=0.5 + ) return status def reset(self, settle_time=0.1, wait_after_reload=1) -> None: - """ Resets the Aerotech controller state - + """Resets the Aerotech controller state + Attempts to reset the currently running measurement task on the A3200 by stopping current motions, reloading aerobasic programs and going to DIRECT mode. @@ -405,8 +406,7 @@ class AerotechAbrStage(PsiDeviceBase): self.set_axis_mode("direct", settle_time=settle_time) def stop(self, settle_time=1.0) -> None: - """ Stops current motions - """ + """Stops current motions""" # Disarm commands self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait() self.stop_command.set(1).wait() @@ -466,7 +466,7 @@ class AerotechAbrStage(PsiDeviceBase): # return state == self._shutter.get() def wait_for_movements(self, timeout=60.0): - """ Waits for all motor movements""" + """Waits for all motor movements""" t_start = time.time() t_elapsed = 0 @@ -480,7 +480,10 @@ class AerotechAbrStage(PsiDeviceBase): """Chechs if all axes are DONE""" return not ( self.omega_done.get() - and self.gmx_done.get() and self.gmy_done.get() and self.gmz_done.get() ) + and self.gmx_done.get() + and self.gmy_done.get() + and self.gmz_done.get() + ) # def start_exposure(self): # """Starts the previously configured exposure.""" diff --git a/pxiii_bec/devices/A3200enums.py b/pxiii_bec/devices/A3200enums.py index c898e94..26df02f 100644 --- a/pxiii_bec/devices/A3200enums.py +++ b/pxiii_bec/devices/A3200enums.py @@ -6,50 +6,67 @@ Enumerations for the MX specific Aerotech A3200 stage. """ # pylint: disable=too-few-public-methods + class AbrStatus: """ABR measurement task status""" + DONE = 0 READY = 1 BUSY = 2 + ABR_DONE = 0 ABR_READY = 1 ABR_BUSY = 2 + class AbrGridStatus: """ABR grid scan status""" + BUSY = 0 DONE = 1 + GRID_SCAN_BUSY = 0 GRID_SCAN_DONE = 1 + class AbrMode: """ABR mode status""" + DIRECT = 0 MEASURING = 1 + DIRECT_MODE = 0 MEASURING_MODE = 1 + class AbrShutterStatus: """ABR shutter status""" + CLOSE = 0 OPEN = 1 + SHUTTER_CLOSE = 0 SHUTTER_OPEN = 1 + class AbrGridPeriod: """ABR grid period""" + FULL = 0 HALF = 1 + FULL_PERIOD = 0 HALF_PERIOD = 1 + class AbrCmd: """ABR command table""" + NONE = 0 RASTER_SCAN_SIMPLE = 1 MEASURE_STANDARD = 2 @@ -73,6 +90,7 @@ class AbrCmd: SCAN_SASTT_V2 = 20 SCAN_SASTT_V3 = 21 + CMD_NONE = 0 CMD_RASTER_SCAN_SIMPLE = 1 CMD_MEASURE_STANDARD = 2 @@ -96,8 +114,10 @@ CMD_SCAN_SASTT = 19 CMD_SCAN_SASTT_V2 = 20 CMD_SCAN_SASTT_V3 = 21 + class AbrAxis: """ABR axis index""" + OMEGA = 1 GMX = 2 GMY = 3 @@ -105,9 +125,10 @@ class AbrAxis: STY = 5 STZ = 6 + AXIS_OMEGA = 1 AXIS_GMX = 2 AXIS_GMY = 3 AXIS_GMZ = 4 AXIS_STY = 5 -AXIS_STZ = 6 \ No newline at end of file +AXIS_STZ = 6 diff --git a/pxiii_bec/devices/A3200utils.py b/pxiii_bec/devices/A3200utils.py index ae0025f..d92027a 100644 --- a/pxiii_bec/devices/A3200utils.py +++ b/pxiii_bec/devices/A3200utils.py @@ -6,20 +6,27 @@ Created on Tue Jun 11 11:28:38 2024 """ import types import math -from ophyd import (Component, PVPositioner, Device, Signal, EpicsSignal, EpicsSignalRO, Kind, - PositionerBase) +from ophyd import ( + Component, + PVPositioner, + Device, + Signal, + EpicsSignal, + EpicsSignalRO, + Kind, + PositionerBase, +) from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus from bec_lib import bec_logger + logger = bec_logger.logger from .A3200enums import * - - ABR_DONE = 0 ABR_READY = 1 ABR_BUSY = 2 @@ -32,12 +39,12 @@ MEASURING_MODE = 1 class A3200Axis(PVPositioner): - """ Positioner wrapper for A3200 axes - - + """Positioner wrapper for A3200 axes + + Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC does not provide a motor record, this class simply wraps axes into a - standard Ophyd positioner for the BEC. It also has some additional + standard Ophyd positioner for the BEC. It also has some additional functionality for error checking and diagnostics. Examples @@ -56,19 +63,22 @@ class A3200Axis(PVPositioner): base_pv : str (situational) IOC PV name root, i.e. X06DA-ES if standalone class. """ - USER_ACCESS = ['omove'] + + USER_ACCESS = ["omove"] abr_mode_direct = Component( - EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted) + EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted + ) abr_mode = Component( - EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted) + EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted + ) # Basic PV positioner interface done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config) readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted) # Setpoint is one of the two... setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config) - #setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config) + # setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config) velocity = Component(EpicsSignal, "-SETV", kind=Kind.config) status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config) @@ -88,7 +98,7 @@ class A3200Axis(PVPositioner): _rock_dist = Component(EpicsSignal, "-RINCP", put_complete=True, kind=Kind.config) _rock_velo = Component(EpicsSignal, "-RSETV", put_complete=True, kind=Kind.config) _rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config) - #_rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config) + # _rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config) hlm = Component(Signal, kind=Kind.config) llm = Component(Signal, kind=Kind.config) @@ -96,37 +106,69 @@ class A3200Axis(PVPositioner): vmax = Component(Signal, kind=Kind.config) offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config) - def __init__(self, prefix="", *, name, base_pv="", kind=None, read_attrs=None, - configuration_attrs=None, parent=None, llm=0, hlm=0, vmin=0, vmax=0, **kwargs): - """ __init__ MUST have a full argument list""" + def __init__( + self, + prefix="", + *, + name, + base_pv="", + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + llm=0, + hlm=0, + vmin=0, + vmax=0, + **kwargs, + ): + """__init__ MUST have a full argument list""" # Patching the parent's PVs into the axis class to check for direct/locked mode if parent is None: + def maybe_add_prefix(self, instance, kw, suffix): # Patched not to enforce parent prefix when no parent if kw in self.add_prefix: return suffix return suffix + self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType( - maybe_add_prefix, - self.__class__.__dict__["abr_mode"]) + maybe_add_prefix, self.__class__.__dict__["abr_mode"] + ) self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType( - maybe_add_prefix, - self.__class__.__dict__["abr_mode_direct"]) + maybe_add_prefix, self.__class__.__dict__["abr_mode_direct"] + ) logger.info(self.__class__.__dict__["abr_mode"].kwargs) self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE" self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT" - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, - configuration_attrs=configuration_attrs, parent=parent, **kwargs) + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) self.llm.set(llm).wait() self.hlm.set(hlm).wait() self.vmin.set(vmin).wait() self.vmax.set(vmax).wait() - def omove(self, position, wait=True, timeout=None, moved_cb=None, velocity=None, relative=False, - direct=False, **kwargs) -> MoveStatus: - """ Native absolute/relative movement on the A3200 """ + def omove( + self, + position, + wait=True, + timeout=None, + moved_cb=None, + velocity=None, + relative=False, + direct=False, + **kwargs, + ) -> MoveStatus: + """Native absolute/relative movement on the A3200""" # Check if we're in direct movement mode if self.abr_mode.value not in (DIRECT_MODE, "DIRECT"): @@ -169,19 +211,18 @@ class A3200Axis(PVPositioner): raise return status - def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus: - """ Exposes the ophyd move command through BEC abstraction""" + """Exposes the ophyd move command through BEC abstraction""" return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs) def rock(self, distance, counts: int, velocity=None, wait=True) -> Status: - """ Repeated single axis zigzag scan I guess PSO should be configured for this""" + """Repeated single axis zigzag scan I guess PSO should be configured for this""" self._rock_dist.put(distance) self._rock_count.put(counts) if velocity is not None: self._rock_velo.put(velocity) - #if acceleration is not None: + # if acceleration is not None: # self._rock_accel.put(acceleration) self._rock.put(1) @@ -190,7 +231,6 @@ class A3200Axis(PVPositioner): status.wait() return status - # def is_omega_ok(self): # """Checks omega axis status""" # return 0 == self.self.omega.status.get() @@ -216,21 +256,11 @@ class A3200Axis(PVPositioner): # time.sleep(0.2) - - - - - - - - - - class A3200RasterScanner(Device): - """Positioner wrapper for motors on the Aerotech A3200 controller. As the - IOC does not provide a motor record, this class simply wraps axes into - a standard Ophyd positioner for the BEC. It also has some additional - functionality for diagnostics.""" + """Positioner wrapper for motors on the Aerotech A3200 controller. As the + IOC does not provide a motor record, this class simply wraps axes into + a standard Ophyd positioner for the BEC. It also has some additional + functionality for diagnostics.""" x_start = Component(EpicsSignal, "-GRD:GMX-START", kind=Kind.config) x_start = Component(EpicsSignal, "-GRD:GMX-END", kind=Kind.config) @@ -259,10 +289,11 @@ class A3200RasterScanner(Device): # Also needs the command interface _zcmd = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted) _start_command = Component( - EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted) + EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted + ) _stop_command = Component( - EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted) - + EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted + ) def raster_setup(self, positions, columns, angle, etime): """configures parameters for a raster scan @@ -334,7 +365,7 @@ class A3200RasterScanner(Device): def raster_get_ready(self): """ - WTF is this??? + WTF is this??? """ self.get_ready.set(1).wait() @@ -342,9 +373,10 @@ class A3200RasterScanner(Device): try: # Define wait until the busy flag goes down (excluding initial update) timestamp_ = 0 + def is_ready(*args, old_value, value, timestamp, **kwargs): nonlocal timestamp_ - result = False if (timestamp_== 0) else bool(value==ABR_READY) + result = False if (timestamp_ == 0) else bool(value == ABR_READY) print(f"Old {old_value}\tNew: {value}\tResult: {result}") timestamp_ = timestamp return result @@ -370,9 +402,10 @@ class A3200RasterScanner(Device): def wait_scan_done(self, timeout=60.0) -> SubscriptionStatus: # Define wait until the busy flag goes down (excluding initial update) timestamp_ = 0 + def is_ready(*args, old_value, value, timestamp, **kwargs): nonlocal timestamp_ - result = False if (timestamp_== 0) else bool(value==1) + result = False if (timestamp_ == 0) else bool(value == 1) print(f"Old {old_value}\tNew: {value}\tResult: {result}") timestamp_ = timestamp return result @@ -389,9 +422,10 @@ class A3200RasterScanner(Device): def raster_wait_for_row(self, timeout=60) -> SubscriptionStatus: # Define wait subscription timestamp_ = 0 + def is_ready(*args, old_value, value, timestamp, **kwargs): nonlocal timestamp_ - result = False if (timestamp_== 0) else bool(value==1) + result = False if (timestamp_ == 0) else bool(value == 1) print(f"Old {old_value}\tNew: {value}\tResult: {result}") timestamp_ = timestamp return result @@ -403,16 +437,17 @@ class A3200RasterScanner(Device): return status def complete(self, timeout=None) -> DeviceStatus: - """ Wait for the grid scanner to finish - + """Wait for the grid scanner to finish + TODO: Probably redundant with task status wait? """ if timeout is not None: # Define wait until the busy flag goes down (excluding initial update) timestamp_ = 0 + def is_ready(*args, old_value, value, timestamp, **kwargs): nonlocal timestamp_ - result = False if (timestamp_== 0) else bool(value==GRID_SCAN_DONE) + result = False if (timestamp_ == 0) else bool(value == GRID_SCAN_DONE) print(f"Old {old_value}\tNew: {value}\tResult: {result}") timestamp_ = timestamp return result @@ -425,11 +460,10 @@ class A3200RasterScanner(Device): return status - class A3200Oscillator(Device): - """No clue what this does, seems to be redundant with the task based grid scanners... - """ - ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config) + """No clue what this does, seems to be redundant with the task based grid scanners...""" + + ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config) orange = Component(EpicsSignal, "-OSC:RANGE", put_complete=True, kind=Kind.config) etime = Component(EpicsSignal, "-OSC:ETIME", put_complete=True, kind=Kind.config) get_ready = Component(EpicsSignal, "-OSC:READY.PROC", put_complete=True, kind=Kind.config) @@ -460,9 +494,10 @@ class A3200Oscillator(Device): RETURNS SubscriptionStatus : A waitable status subscribed on 'phase' updates. """ + # Define wait until the busy flag goes down (excluding initial update) def in_status(*args, old_value, value, _, **kwargs): - result = bool(value==target) + result = bool(value == target) # print(f"Old {old_value}\tNew: {value}\tResult: {result}") return result @@ -471,7 +506,6 @@ class A3200Oscillator(Device): return status - # Automatically start an axis if directly invoked if __name__ == "__main__": omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega") diff --git a/pxiii_bec/devices/SmarGon.py b/pxiii_bec/devices/SmarGon.py index 489e613..15d2a7a 100644 --- a/pxiii_bec/devices/SmarGon.py +++ b/pxiii_bec/devices/SmarGon.py @@ -6,18 +6,20 @@ import requests try: from bec_lib import bec_logger + logger = bec_logger.logger except ModuleNotFoundError: import logging + logger = logging.getLogger("SmarGon") class SmarGonSignal(Signal): - """ SmarGonSignal (R/W) - + """SmarGonSignal (R/W) + Small helper class to read/write parameters from SmarGon. As there is no motion status readback from smargopolo, this should be substituted with - setting with 'settle_time'. + setting with 'settle_time'. """ def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs): @@ -28,7 +30,7 @@ class SmarGonSignal(Signal): # self.get() def put(self, value, *args, timestamp=None, **kwargs): - """ Overriden put to add communication with smargopolo""" + """Overriden put to add communication with smargopolo""" # Validate new value self.check_value(value) @@ -44,15 +46,16 @@ class SmarGonSignal(Signal): self.value = r[self.addr.upper()] # Notify subscribers - self._run_subs(sub_type=self.SUB_VALUE, old_value=old_value, - value=value, timestamp=self._timestamp) + self._run_subs( + sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp + ) @property def limits(self): return self._limits def check_value(self, value, **kwargs): - """ Check if value falls within limits""" + """Check if value falls within limits""" lol = self.limits[0] if lol is not None: if value < lol: @@ -70,9 +73,9 @@ class SmarGonSignal(Signal): class SmarGonSignalRO(Signal): - """ Small helper class for read-only parameters PVs from SmarGon. + """Small helper class for read-only parameters PVs from SmarGon. - TODO: Add monitoring + TODO: Add monitoring """ def __init__(self, *args, read_addr="readbackSCS", **kwargs): @@ -111,8 +114,9 @@ class SmarGonAxis(Device): This class controls the SmarGon goniometer via the REST interface. """ + # Status attributes - sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) + sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False}) corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config) mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config) @@ -134,17 +138,25 @@ class SmarGonAxis(Device): parent=None, device_manager=None, sg_url: str = "http://x06da-smargopolo.psi.ch:3000", - low_limit = None, - high_limit = None, + low_limit=None, + high_limit=None, **kwargs, ) -> None: - self.__class__.__dict__['readback'].kwargs['read_addr'] = f"readback{prefix}" - self.__class__.__dict__['setpoint'].kwargs['write_addr'] = f"target{prefix}" - self.__class__.__dict__['setpoint'].kwargs['low_limit'] = low_limit - self.__class__.__dict__['setpoint'].kwargs['high_limit'] = high_limit + self.__class__.__dict__["readback"].kwargs["read_addr"] = f"readback{prefix}" + self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}" + self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit + self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit # super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) self.sg_url._metadata["write_access"] = False self.sg_url.set(sg_url, force=True).wait() @@ -160,7 +172,9 @@ class SmarGonAxis(Device): cmd = f"{self.sg_url.get()}/{address}" r = requests.get(cmd, timeout=1) if not r.ok: - raise RuntimeError(f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}") + raise RuntimeError( + f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}" + ) return r.json() def _go_n_put(self, address, **kwargs): @@ -168,19 +182,22 @@ class SmarGonAxis(Device): cmd = f"{self.sg_url.get()}/{address}" r = requests.put(cmd, timeout=1) if not r.ok: - raise RuntimeError(f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}") + raise RuntimeError( + f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}" + ) return r.json() - - - - - if __name__ == "__main__": shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000") shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000") - shz = SmarGonAxis(prefix="SCS", name="shz", low_limit=10, high_limit=22, sg_url="http://x06da-smargopolo.psi.ch:3000") + shz = SmarGonAxis( + prefix="SCS", + name="shz", + low_limit=10, + high_limit=22, + sg_url="http://x06da-smargopolo.psi.ch:3000", + ) shx.wait_for_connection() shy.wait_for_connection() shz.wait_for_connection() diff --git a/pxiii_bec/scans/mx_measurements.py b/pxiii_bec/scans/mx_measurements.py index 86af521..17c0f40 100644 --- a/pxiii_bec/scans/mx_measurements.py +++ b/pxiii_bec/scans/mx_measurements.py @@ -3,6 +3,7 @@ Scan primitives for standard BEC scans at the PX beamlines at SLS. Theese scans define the event model and can be called from higher levels. """ + import numpy as np from bec_lib import bec_logger @@ -11,9 +12,9 @@ from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase logger = bec_logger.logger - class AbrCmd: - """ Valid Aerotech ABR scan commands from the AeroBasic files""" + """Valid Aerotech ABR scan commands from the AeroBasic files""" + NONE = 0 RASTER_SCAN_SIMPLE = 1 MEASURE_STANDARD = 2 @@ -38,9 +39,8 @@ class AbrCmd: # SCAN_SASTT_V3 = 21 - class AerotechFlyscanBase(AsyncFlyScanBase): - """ Base class for MX flyscans + """Base class for MX flyscans Low-level base class for standard scans at the PX beamlines at SLS. Theese scans use the A3200 rotation stage and the actual experiment is performed @@ -50,26 +50,27 @@ class AerotechFlyscanBase(AsyncFlyScanBase): configurations in child classes or pass it as command line parameters. IMPORTANT: The AeroBasic scripts take care of the PSO configuration. - + Parameters: ----------- abr_complete : bool Wait for the launched ABR task to complete. """ + scan_type = "fly" scan_report_hint = "table" arg_input = {} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} # Aerotech stage config - abr_raster_reset =False + abr_raster_reset = False abr_complete = False abr_timeout = None pointID = 0 num_pos = 0 def __init__(self, *args, parameter: dict = None, **kwargs): - """ Just set num_pos=0 to avoid hanging and override defaults if explicitly set from + """Just set num_pos=0 to avoid hanging and override defaults if explicitly set from parameters. """ super().__init__(parameter=parameter, **kwargs) @@ -81,14 +82,14 @@ class AerotechFlyscanBase(AsyncFlyScanBase): self.abr_timeout = self.caller_kwargs.get("abr_timeout") def pre_scan(self): - """ Mostly just checking if ABR stage is ok...""" + """Mostly just checking if ABR stage is ok...""" # TODO: Move roughly to start position??? # ABR status checking - stat = yield from self.stubs.send_rpc_and_wait("abr","status.get") + stat = yield from self.stubs.send_rpc_and_wait("abr", "status.get") if stat not in (0, "OK"): raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset") - task = yield from self.stubs.send_rpc_and_wait("abr","task1.get") + task = yield from self.stubs.send_rpc_and_wait("abr", "task1.get") # From what I got values are copied to local vars at the start of scan, # so only kickoff should be forbidden. if task not in (1, "OK"): @@ -106,8 +107,7 @@ class AerotechFlyscanBase(AsyncFlyScanBase): # return super().stage() def scan_core(self): - """ The actual scan logic comes here. - """ + """The actual scan logic comes here.""" # Kick off the run yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff") logger.info("Measurement launched on the ABR stage...") @@ -127,9 +127,8 @@ class AerotechFlyscanBase(AsyncFlyScanBase): return super().cleanup() - class MeasureStandardWedge(AerotechFlyscanBase): - """ Standard wedge scan using the OMEGA motor + """Standard wedge scan using the OMEGA motor Measure an absolute continous line scan from `start` to `start` + `range` during `move_time` on the Omega axis with PSO output. @@ -148,22 +147,22 @@ class MeasureStandardWedge(AerotechFlyscanBase): range : (float, float) Scan range of the axis. move_time : (float) - Total travel time for the movement [s]. + Total travel time for the movement [s]. ready_rate : float, optional No clue what is this... (default=500) """ + scan_name = "standardscan" required_kwargs = ["start", "range", "move_time"] - class MeasureVerticalLine(AerotechFlyscanBase): - """ Vertical line scan using the GMY motor + """Vertical line scan using the GMY motor Simple relative continous line scan that records a single vertical line with PSO output. There's no actual stepping, it's only used for velocity calculation. - + The scan itself is executed by the scan service running on the Aerotech controller. Ophyd just configures, launches it and waits for completion. @@ -180,19 +179,19 @@ class MeasureVerticalLine(AerotechFlyscanBase): exp_time : float Eeffective exposure time per step [s] """ + scan_name = "vlinescan" required_kwargs = ["exp_time", "range", "steps"] - class MeasureRasterSimple(AerotechFlyscanBase): - """ Simple raster scan + """Simple raster scan Measure a simplified relative zigzag raster scan in the X-Y plane. The scan is relative assumes the goniometer is currently at the CENTER of the first cell (i.e. TOP-LEFT). Each line is executed as a single continous movement, i.e. there's no actual stepping in the X direction. - + The scan itself is executed by the scan service running on the Aerotech controller. Ophyd just configures, launches it and waits for completion. @@ -213,16 +212,15 @@ class MeasureRasterSimple(AerotechFlyscanBase): steps_y : int Number of scan steps in Y (slow). """ + scan_name = "rasterscan" required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"] - - class MeasureScreening(AerotechFlyscanBase): - """ Sample screening scan - - Sample screening scan that scans intervals on the rotation axis taking + """Sample screening scan + + Sample screening scan that scans intervals on the rotation axis taking 1 image/interval. This makes sure that we hit diffraction peaks if there are any crystals. @@ -242,11 +240,12 @@ class MeasureScreening(AerotechFlyscanBase): steps : int Number of blurred intervals. exp_time : float - Exposure time per blurred interval [s]. + Exposure time per blurred interval [s]. oscrange : float Motion blurring of each interval [deg] delta : float Safety margin for sub-range movements (default: 0.5) [deg]. """ + scan_name = "screeningscan" required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"]