Blacking and cleanup of unused code

This commit is contained in:
gac-x06da
2025-01-24 15:21:59 +01:00
parent 015bf2ee3b
commit 82d51649ee
4 changed files with 21 additions and 267 deletions

View File

@@ -256,256 +256,6 @@ class A3200Axis(PVPositioner):
# time.sleep(0.2)
class A3200RasterScanner(Device):
"""Positioner wrapper for motors on the Aerotech A3200 controller. As the
IOC does not provide a motor record, this class simply wraps axes into
a standard Ophyd positioner for the BEC. It also has some additional
functionality for diagnostics."""
x_start = Component(EpicsSignal, "-GRD:GMX-START", kind=Kind.config)
x_start = Component(EpicsSignal, "-GRD:GMX-END", kind=Kind.config)
omega = Component(EpicsSignal, "-OSC:#M-START", kind=Kind.config)
osc = Component(EpicsSignal, "-GRD:ANGLE", kind=Kind.config)
celltime = Component(EpicsSignal, "-GRD:CELL-TIME", kind=Kind.config)
y_start = Component(EpicsSignal, "-OSC:#STY-START", kind=Kind.config)
y_end = Component(EpicsSignal, "-OSC:#STY-END", kind=Kind.config)
z_start = Component(EpicsSignal, "-OSC:#STZ-START", kind=Kind.config)
z_end = Component(EpicsSignal, "-OSC:#STZ-END", kind=Kind.config)
columns = Component(EpicsSignal, "-GRD:COLUMNS", kind=Kind.config)
rows = Component(EpicsSignal, "-GRD:ROWS", kind=Kind.config)
delay = Component(EpicsSignal, "-GRD:RAST-DLY", kind=Kind.config)
osc_mode = Component(EpicsSignal, "-GRD:SET-MODE", kind=Kind.config)
velo = Component(EpicsSignal, "-GRD:#X-VEL", kind=Kind.config)
get_ready = Component(EpicsSignal, "-GRD:READY.PROC", kind=Kind.config)
grid_start = Component(EpicsSignal, "-GRD:START.PROC", kind=Kind.config)
grid_next = Component(EpicsSignal, "-GRD:NEXT-ROW", kind=Kind.config)
row_done = Component(EpicsSignal, "-GRD:ROW-DONE", kind=Kind.config)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", kind=Kind.config)
grid_done = Component(EpicsSignal, "-GRD:DONE", kind=Kind.config)
# Also needs the command interface
_zcmd = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
_start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
_stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
def raster_setup(self, positions, columns, angle, etime):
"""configures parameters for a raster scan
positions : a list of xyz indicating the positions of each cell
columns: an integer with how many columns
angle : the oscillation angle for each row
etime : the exposure time per cell
"""
self.parent.axisModeDirect.set(1).wait()
if columns == 1:
raise RuntimeWarning("Fast scans are not available with vertical line scans.")
rows = len(positions) / columns
x1, y1, z1 = tuple(positions[0]) # first cell on first row
x2, y2, z2 = tuple(positions[1]) # second cell on first row
x4, y4, z4 = tuple(positions[columns - 1]) # last cell on first row
if rows > 1:
x3, y3, z3 = tuple(positions[columns]) # first cell on second row
ystep = y3 - y1
zstep = z3 - z1
gmy_step = math.sqrt(pow(y3 - y1, 2) + pow(z3 - z1, 2))
else:
ystep = 0.010
zstep = 0.010
gmy_step = 0.010
half_cell = abs(x2 - x1) / 2.0
start_x = x1 - half_cell
end_x = x4 + half_cell
self._raster_starting_x = start_x
self._raster_starting_y = y1
self._raster_starting_z = z1
self._raster_step_y = ystep
self._raster_step_z = zstep
self._raster_current_row = 0
self._raster_num_rows = rows
self.x_start.set(start_x).wait()
self.x_end.set(end_x).wait()
self.omega.set(self.position).wait()
self.osc.set(angle).wait()
self.celltime.set(etime).wait()
self.y_start.set(y1).wait()
self.z_start.set(z1).wait()
self.y_step.set(ystep).wait()
self.z_step.set(zstep).wait()
self.columns.set(columns).wait()
self.rows.set(rows).wait()
def raster_next_row_yz(self):
r = self._raster_current_row
self._raster_current_row = r + 1
y, z = (
r * self.step_y.value + self._raster_starting_y,
r * self.step_z.value + self._raster_starting_z,
)
if r < self._raster_num_rows:
return (y, z)
return (None, None)
def next_row(self):
"""start rastering a new row"""
self.grid_next.set(1).wait()
def raster_get_ready(self):
"""
WTF is this???
"""
self.get_ready.set(1).wait()
for _ in range(10):
try:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_ == 0) else bool(value == ABR_READY)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.grid_done, is_ready, timeout=2, settle_time=0.5)
status.wait()
except TimeoutError as e:
print(str(e), end=" ")
print(" --- trying again.")
self.get_ready.put(1, wait=True)
def raster_scan_row(self):
"""start rastering a new row, either first or following rows"""
if ABR_READY == self._grid_done.get():
self.grid_start.set(1)
else:
self.grid_next.set(1)
def is_scan_done(self):
return 1 == self.scan_done.get()
def wait_scan_done(self, timeout=60.0) -> SubscriptionStatus:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_ == 0) else bool(value == 1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.scan_done, is_ready, timeout=timeout, settle_time=0.5)
status.wait()
return status
def raster_is_row_scanning(self):
return 1 == self.scan_done.get()
def raster_wait_for_row(self, timeout=60) -> SubscriptionStatus:
# Define wait subscription
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_ == 0) else bool(value == 1)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.row_done, is_ready, timeout=timeout, settle_time=0.5)
status.wait()
return status
def complete(self, timeout=None) -> DeviceStatus:
"""Wait for the grid scanner to finish
TODO: Probably redundant with task status wait?
"""
if timeout is not None:
# Define wait until the busy flag goes down (excluding initial update)
timestamp_ = 0
def is_ready(*args, old_value, value, timestamp, **kwargs):
nonlocal timestamp_
result = False if (timestamp_ == 0) else bool(value == GRID_SCAN_DONE)
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
timestamp_ = timestamp
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.scan_done, is_ready, timeout=timeout, settle_time=0.5)
return status
status = DeviceStatus(self, settle_time=0.5)
status.set_finished()
return status
class A3200Oscillator(Device):
"""No clue what this does, seems to be redundant with the task based grid scanners..."""
ostart_pos = Component(EpicsSignal, "-OSC:START-POS", put_complete=True, kind=Kind.config)
orange = Component(EpicsSignal, "-OSC:RANGE", put_complete=True, kind=Kind.config)
etime = Component(EpicsSignal, "-OSC:ETIME", put_complete=True, kind=Kind.config)
get_ready = Component(EpicsSignal, "-OSC:READY.PROC", put_complete=True, kind=Kind.config)
taskStart = Component(EpicsSignal, "-OSC:START", put_complete=True, kind=Kind.config)
phase = Component(EpicsSignal, "-OSC:DONE", auto_monitor=True, kind=Kind.config)
@property
def is_done(self):
return ABR_DONE == self.phase.get()
@property
def is_ready(self):
return ABR_READY == self.phase.get()
@property
def is_busy(self):
return ABR_BUSY == self.phase.get()
def kickoff(self):
self.osc.taskStart.set(1).wait()
def wait_status(self, target, timeout=60.0) -> SubscriptionStatus:
"""Wait for the Aertotech IOC to reach the desired `status`.
PARAMETERS
`status` can be any the three ABR_DONE, ABR_READY or ABR_BUSY.
RETURNS
SubscriptionStatus : A waitable status subscribed on 'phase' updates.
"""
# Define wait until the busy flag goes down (excluding initial update)
def in_status(*args, old_value, value, _, **kwargs):
result = bool(value == target)
# print(f"Old {old_value}\tNew: {value}\tResult: {result}")
return result
# Subscribe and wait for update
status = SubscriptionStatus(self.phase, in_status, timeout=timeout, settle_time=0.5)
return status
# Automatically start an axis if directly invoked
if __name__ == "__main__":
omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", name="omega")

View File

@@ -51,7 +51,9 @@ def scsput(**kwargs):
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")}
xyz = {
k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetSCS?{thing}"
if kwargs.get("verbose", False):
@@ -92,7 +94,9 @@ def scsrelput(**kwargs) -> None:
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")}
xyz = {
k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetSCS_rel?{thing}"
if kwargs.get("verbose", False):
@@ -194,13 +198,7 @@ class SmarGon(object):
while not all(in_place) and time() < tout:
rbv = self.readback_scs()
in_place = []
for k, v in {
"SHX": 0.0,
"SHY": 0.0,
"SHZ": 18.0,
"CHI": 0.0,
"PHI": 0.0,
}.items():
for k, v in {"SHX": 0.0, "SHY": 0.0, "SHZ": 18.0, "CHI": 0.0, "PHI": 0.0}.items():
in_place.append(abs(rbv[k] - v) < 0.01)
if time() > tout:
raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}")
@@ -290,7 +288,9 @@ class SmarGon(object):
def wait(self, timeout=60.0):
"""waits up to `timeout` seconds for smargon to reach target"""
target = {
k.upper(): v for k, v in self.target_scs().items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
k.upper(): v
for k, v in self.target_scs().items()
if k.lower() in ("shx", "shy", "shz", "chi", "phi")
}
timeout = timeout + time()
@@ -323,7 +323,9 @@ class SmarGon(object):
bcsput(**value)
elif key == "target":
if not isinstance(value, dict):
raise Exception(f"expected a dict with target axis and values got something else: {value}")
raise Exception(
f"expected a dict with target axis and values got something else: {value}"
)
for k in value.keys():
if k.lower() not in "shx shy shz chi phi ox oy oz".split():
raise Exception(f'unknown axis in target "{k}"')

View File

@@ -1,2 +1,6 @@
from .mx_measurements import (MeasureStandardWedge, MeasureVerticalLine,
MeasureRasterSimple, MeasureScreening)
from .mx_measurements import (
MeasureStandardWedge,
MeasureVerticalLine,
MeasureRasterSimple,
MeasureScreening,
)

View File

@@ -4,10 +4,8 @@ Scan primitives for standard BEC scans at the PX beamlines at SLS.
Theese scans define the event model and can be called from higher levels.
"""
import numpy as np
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType, ScanBase
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
@@ -20,7 +18,7 @@ class AbrCmd:
MEASURE_STANDARD = 2
VERTICAL_LINE_SCAN = 3
SCREENING = 4
# SUPER_FAST_OMEGA = 5 # Some Japanese wanted to measure samples in capilaries at high RPMs
# SUPER_FAST_OMEGA = 5 # Some Japanese measured samples in capilaries at high RPMs
# STILL_WEDGE = 6 # NOTE: Unused Step scan
# STILLS = 7 # NOTE: Unused Just send triggers to detector
# REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused