105 lines
3.0 KiB
Python
105 lines
3.0 KiB
Python
import time
|
|
|
|
import logging
|
|
import enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
from epics.ca import pend_event
|
|
|
|
class DeltatauMotorStatus(enum.Flag):
|
|
"""names are taken from motorx_all.ui"""
|
|
DIRECTION = enum.auto()
|
|
MOTION_IS_COMPLETE = enum.auto()
|
|
PLUS_LIMIT_SWITCH = enum.auto()
|
|
HOME_SWITCH = enum.auto()
|
|
UNUSED_1 = enum.auto()
|
|
CLOSED_LOOP_POSITION = enum.auto()
|
|
SLIP_STALL_DETECTED = enum.auto()
|
|
AT_HOME_POSITION = enum.auto()
|
|
ENCODER_IS_PRESENT = enum.auto()
|
|
PROBLEM = enum.auto()
|
|
MOVING = enum.auto()
|
|
GAIN_SUPPORT = enum.auto()
|
|
COMMUNICATION_ERROR = enum.auto()
|
|
MINUS_LIMIT_SWITCH = enum.auto()
|
|
MOTOR_HAS_BEEN_HOMED = enum.auto()
|
|
|
|
AT_SWITCH = MINUS_LIMIT_SWITCH | PLUS_LIMIT_SWITCH
|
|
|
|
class PositionsNotReached(Exception):
|
|
pass
|
|
|
|
def assert_tweaker_positions(targets, timeout=60.0):
|
|
"""
|
|
wait for each of the given motors to have specified positions within tolerance
|
|
:param targets: [
|
|
(motor, target_position, tolerance),
|
|
...
|
|
]
|
|
:return: None
|
|
:raises: PositionsNotReached
|
|
"""
|
|
num_motors = len(targets)
|
|
|
|
timeisup = timeout + time.time()
|
|
|
|
while time.time() < timeisup:
|
|
count = 0
|
|
summary = []
|
|
for i, m in enumerate(targets):
|
|
motor, target, tolerance = m
|
|
name = motor.short_name
|
|
pend_event()
|
|
cur = motor.get_position()
|
|
done = motor.is_done()
|
|
logger.debug("check {}[done={}]: {} == {}".format(name, done, cur, target))
|
|
summary.append("{}[done={}]: {} == {}".format(name, done, cur, target))
|
|
if done and tolerance >= abs(cur - target):
|
|
count += 1
|
|
pend_event(0.1)
|
|
|
|
if count == num_motors:
|
|
break
|
|
pend_event(0.1)
|
|
|
|
if count != num_motors:
|
|
raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary)))
|
|
|
|
def assert_motor_positions(targets, timeout=60.0):
|
|
"""
|
|
wait for each of the given motors to have specified positions within tolerance
|
|
:param targets: [
|
|
(motor, target_position, tolerance),
|
|
...
|
|
]
|
|
:return: None
|
|
:raises: PositionsNotReached
|
|
"""
|
|
num_motors = len(targets)
|
|
|
|
timeisup = timeout + time.time()
|
|
|
|
while time.time() < timeisup:
|
|
count = 0
|
|
summary = []
|
|
for i, m in enumerate(targets):
|
|
motor, target, tolerance = m
|
|
name = motor._prefix
|
|
pend_event()
|
|
cur = motor.get_position(readback=True)
|
|
done = motor.done_moving
|
|
logger.debug("check {}[done={}]: {} == {}".format(name, done, cur, target))
|
|
summary.append("{}[done={}]: {} == {}".format(name, done, cur, target))
|
|
if done and tolerance >= abs(cur - target):
|
|
count += 1
|
|
pend_event(0.1)
|
|
|
|
if count == num_motors:
|
|
break
|
|
pend_event()
|
|
|
|
if count != num_motors:
|
|
raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary)))
|