From 82d51649ee5c8097b45e96e29683953a80d79705 Mon Sep 17 00:00:00 2001 From: gac-x06da Date: Fri, 24 Jan 2025 15:21:59 +0100 Subject: [PATCH] Blacking and cleanup of unused code --- pxiii_bec/devices/A3200utils.py | 250 ----------------------------- pxiii_bec/devices/SmarGon_orig.py | 24 +-- pxiii_bec/scans/__init__.py | 8 +- pxiii_bec/scans/mx_measurements.py | 6 +- 4 files changed, 21 insertions(+), 267 deletions(-) diff --git a/pxiii_bec/devices/A3200utils.py b/pxiii_bec/devices/A3200utils.py index d92027a..0de97a9 100644 --- a/pxiii_bec/devices/A3200utils.py +++ b/pxiii_bec/devices/A3200utils.py @@ -256,256 +256,6 @@ class A3200Axis(PVPositioner): # time.sleep(0.2) -class A3200RasterScanner(Device): - """Positioner wrapper for motors on the Aerotech A3200 controller. As the - IOC does not provide a motor record, this class simply wraps axes into - a standard Ophyd positioner for the BEC. It also has some additional - functionality for diagnostics.""" - - x_start = Component(EpicsSignal, "-GRD:GMX-START", kind=Kind.config) - x_start = Component(EpicsSignal, "-GRD:GMX-END", kind=Kind.config) - omega = Component(EpicsSignal, "-OSC:#M-START", kind=Kind.config) - osc = Component(EpicsSignal, "-GRD:ANGLE", kind=Kind.config) - celltime = Component(EpicsSignal, "-GRD:CELL-TIME", kind=Kind.config) - - y_start = Component(EpicsSignal, "-OSC:#STY-START", kind=Kind.config) - y_end = Component(EpicsSignal, "-OSC:#STY-END", kind=Kind.config) - z_start = Component(EpicsSignal, "-OSC:#STZ-START", kind=Kind.config) - z_end = Component(EpicsSignal, "-OSC:#STZ-END", kind=Kind.config) - - columns = Component(EpicsSignal, "-GRD:COLUMNS", kind=Kind.config) - rows = Component(EpicsSignal, "-GRD:ROWS", kind=Kind.config) - delay = Component(EpicsSignal, "-GRD:RAST-DLY", kind=Kind.config) - osc_mode = Component(EpicsSignal, "-GRD:SET-MODE", kind=Kind.config) - - velo = Component(EpicsSignal, "-GRD:#X-VEL", kind=Kind.config) - get_ready = Component(EpicsSignal, "-GRD:READY.PROC", kind=Kind.config) - grid_start = Component(EpicsSignal, "-GRD:START.PROC", kind=Kind.config) - grid_next = Component(EpicsSignal, "-GRD:NEXT-ROW", kind=Kind.config) - row_done = Component(EpicsSignal, "-GRD:ROW-DONE", kind=Kind.config) - scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", kind=Kind.config) - grid_done = Component(EpicsSignal, "-GRD:DONE", kind=Kind.config) - - # Also needs the command interface - _zcmd = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted) - _start_command = Component( - EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted - ) - _stop_command = Component( - EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted - ) - - def raster_setup(self, positions, columns, angle, etime): - """configures parameters for a raster scan - - positions : a list of xyz indicating the positions of each cell - columns: an integer with how many columns - angle : the oscillation angle for each row - etime : the exposure time per cell - """ - self.parent.axisModeDirect.set(1).wait() - if columns == 1: - raise RuntimeWarning("Fast scans are not available with vertical line scans.") - - rows = len(positions) / columns - x1, y1, z1 = tuple(positions[0]) # first cell on first row - x2, y2, z2 = tuple(positions[1]) # second cell on first row - x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row - - if rows > 1: - x3, y3, z3 = tuple(positions[columns]) # first cell on second row - ystep = y3 - y1 - zstep = z3 - z1 - gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2)) - else: - ystep = 0.010 - zstep = 0.010 - gmy_step = 0.010 - - half_cell = abs(x2 - x1) / 2.0 - start_x = x1 - half_cell - end_x = x4 + half_cell - - self._raster_starting_x = start_x - self._raster_starting_y = y1 - self._raster_starting_z = z1 - self._raster_step_y = ystep - self._raster_step_z = zstep - self._raster_current_row = 0 - self._raster_num_rows = rows - - self.x_start.set(start_x).wait() - self.x_end.set(end_x).wait() - self.omega.set(self.position).wait() - self.osc.set(angle).wait() - self.celltime.set(etime).wait() - self.y_start.set(y1).wait() - self.z_start.set(z1).wait() - self.y_step.set(ystep).wait() - self.z_step.set(zstep).wait() - self.columns.set(columns).wait() - self.rows.set(rows).wait() - - def raster_next_row_yz(self): - r = self._raster_current_row - self._raster_current_row = r + 1 - - y, z = ( - r * self.step_y.value + self._raster_starting_y, - r * self.step_z.value + self._raster_starting_z, - ) - - if r < self._raster_num_rows: - return (y, z) - return (None, None) - - def next_row(self): - """start rastering a new row""" - self.grid_next.set(1).wait() - - def raster_get_ready(self): - """ - WTF is this??? - """ - - self.get_ready.set(1).wait() - for _ in range(10): - try: - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - - def is_ready(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_ == 0) else bool(value == ABR_READY) - print(f"Old {old_value}\tNew: {value}\tResult: {result}") - timestamp_ = timestamp - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.grid_done, is_ready, timeout=2, settle_time=0.5) - status.wait() - except TimeoutError as e: - print(str(e), end=" ") - print(" --- trying again.") - self.get_ready.put(1, wait=True) - - def raster_scan_row(self): - """start rastering a new row, either first or following rows""" - if ABR_READY == self._grid_done.get(): - self.grid_start.set(1) - else: - self.grid_next.set(1) - - def is_scan_done(self): - return 1 == self.scan_done.get() - - def wait_scan_done(self, timeout=60.0) -> SubscriptionStatus: - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - - def is_ready(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_ == 0) else bool(value == 1) - print(f"Old {old_value}\tNew: {value}\tResult: {result}") - timestamp_ = timestamp - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.scan_done, is_ready, timeout=timeout, settle_time=0.5) - status.wait() - - return status - - def raster_is_row_scanning(self): - return 1 == self.scan_done.get() - - def raster_wait_for_row(self, timeout=60) -> SubscriptionStatus: - # Define wait subscription - timestamp_ = 0 - - def is_ready(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_ == 0) else bool(value == 1) - print(f"Old {old_value}\tNew: {value}\tResult: {result}") - timestamp_ = timestamp - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.row_done, is_ready, timeout=timeout, settle_time=0.5) - status.wait() - - return status - - def complete(self, timeout=None) -> DeviceStatus: - """Wait for the grid scanner to finish - - TODO: Probably redundant with task status wait? - """ - if timeout is not None: - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - - def is_ready(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_ == 0) else bool(value == GRID_SCAN_DONE) - print(f"Old {old_value}\tNew: {value}\tResult: {result}") - timestamp_ = timestamp - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.scan_done, is_ready, timeout=timeout, settle_time=0.5) - return status - status = DeviceStatus(self, settle_time=0.5) - status.set_finished() - return status - - -class A3200Oscillator(Device): - """No clue what this does, seems to be redundant with the task based grid scanners...""" - - ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config) - orange = Component(EpicsSignal, "-OSC:RANGE", put_complete=True, kind=Kind.config) - etime = Component(EpicsSignal, "-OSC:ETIME", put_complete=True, kind=Kind.config) - get_ready = Component(EpicsSignal, "-OSC:READY.PROC", put_complete=True, kind=Kind.config) - taskStart = Component(EpicsSignal, "-OSC:START", put_complete=True, kind=Kind.config) - phase = Component(EpicsSignal, "-OSC:DONE", auto_monitor=True, kind=Kind.config) - - @property - def is_done(self): - return ABR_DONE == self.phase.get() - - @property - def is_ready(self): - return ABR_READY == self.phase.get() - - @property - def is_busy(self): - return ABR_BUSY == self.phase.get() - - def kickoff(self): - self.osc.taskStart.set(1).wait() - - def wait_status(self, target, timeout=60.0) -> SubscriptionStatus: - """Wait for the Aertotech IOC to reach the desired `status`. - - PARAMETERS - `status` can be any the three ABR_DONE, ABR_READY or ABR_BUSY. - - RETURNS - SubscriptionStatus : A waitable status subscribed on 'phase' updates. - """ - - # Define wait until the busy flag goes down (excluding initial update) - def in_status(*args, old_value, value, _, **kwargs): - result = bool(value == target) - # print(f"Old {old_value}\tNew: {value}\tResult: {result}") - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.phase, in_status, timeout=timeout, settle_time=0.5) - return status - - # Automatically start an axis if directly invoked if __name__ == "__main__": omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega") diff --git a/pxiii_bec/devices/SmarGon_orig.py b/pxiii_bec/devices/SmarGon_orig.py index bde885f..926c653 100644 --- a/pxiii_bec/devices/SmarGon_orig.py +++ b/pxiii_bec/devices/SmarGon_orig.py @@ -51,7 +51,9 @@ def scsput(**kwargs): :return: :rtype: """ - xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")} + xyz = { + k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi") + } thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()]) cmd = f"{base}/targetSCS?{thing}" if kwargs.get("verbose", False): @@ -92,7 +94,9 @@ def scsrelput(**kwargs) -> None: :return: :rtype: """ - xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")} + xyz = { + k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi") + } thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()]) cmd = f"{base}/targetSCS_rel?{thing}" if kwargs.get("verbose", False): @@ -194,13 +198,7 @@ class SmarGon(object): while not all(in_place) and time() < tout: rbv = self.readback_scs() in_place = [] - for k, v in { - "SHX": 0.0, - "SHY": 0.0, - "SHZ": 18.0, - "CHI": 0.0, - "PHI": 0.0, - }.items(): + for k, v in {"SHX": 0.0, "SHY": 0.0, "SHZ": 18.0, "CHI": 0.0, "PHI": 0.0}.items(): in_place.append(abs(rbv[k] - v) < 0.01) if time() > tout: raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}") @@ -290,7 +288,9 @@ class SmarGon(object): def wait(self, timeout=60.0): """waits up to `timeout` seconds for smargon to reach target""" target = { - k.upper(): v for k, v in self.target_scs().items() if k.lower() in ("shx", "shy", "shz", "chi", "phi") + k.upper(): v + for k, v in self.target_scs().items() + if k.lower() in ("shx", "shy", "shz", "chi", "phi") } timeout = timeout + time() @@ -323,7 +323,9 @@ class SmarGon(object): bcsput(**value) elif key == "target": if not isinstance(value, dict): - raise Exception(f"expected a dict with target axis and values got something else: {value}") + raise Exception( + f"expected a dict with target axis and values got something else: {value}" + ) for k in value.keys(): if k.lower() not in "shx shy shz chi phi ox oy oz".split(): raise Exception(f'unknown axis in target "{k}"') diff --git a/pxiii_bec/scans/__init__.py b/pxiii_bec/scans/__init__.py index b874c1d..f95b6ed 100644 --- a/pxiii_bec/scans/__init__.py +++ b/pxiii_bec/scans/__init__.py @@ -1,2 +1,6 @@ -from .mx_measurements import (MeasureStandardWedge, MeasureVerticalLine, - MeasureRasterSimple, MeasureScreening) +from .mx_measurements import ( + MeasureStandardWedge, + MeasureVerticalLine, + MeasureRasterSimple, + MeasureScreening, +) diff --git a/pxiii_bec/scans/mx_measurements.py b/pxiii_bec/scans/mx_measurements.py index 17c0f40..539ee62 100644 --- a/pxiii_bec/scans/mx_measurements.py +++ b/pxiii_bec/scans/mx_measurements.py @@ -4,10 +4,8 @@ Scan primitives for standard BEC scans at the PX beamlines at SLS. Theese scans define the event model and can be called from higher levels. """ -import numpy as np - from bec_lib import bec_logger -from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase +from bec_server.scan_server.scans import AsyncFlyScanBase logger = bec_logger.logger @@ -20,7 +18,7 @@ class AbrCmd: MEASURE_STANDARD = 2 VERTICAL_LINE_SCAN = 3 SCREENING = 4 - # SUPER_FAST_OMEGA = 5 # Some Japanese wanted to measure samples in capilaries at high RPMs + # SUPER_FAST_OMEGA = 5 # Some Japanese measured samples in capilaries at high RPMs # STILL_WEDGE = 6 # NOTE: Unused Step scan # STILLS = 7 # NOTE: Unused Just send triggers to detector # REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused