import time import logging import enum logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) from epics.ca import pend_event class DeltatauMotorStatus(enum.Flag): """names are taken from motorx_all.ui""" DIRECTION = enum.auto() MOTION_IS_COMPLETE = enum.auto() PLUS_LIMIT_SWITCH = enum.auto() HOME_SWITCH = enum.auto() UNUSED_1 = enum.auto() CLOSED_LOOP_POSITION = enum.auto() SLIP_STALL_DETECTED = enum.auto() AT_HOME_POSITION = enum.auto() ENCODER_IS_PRESENT = enum.auto() PROBLEM = enum.auto() MOVING = enum.auto() GAIN_SUPPORT = enum.auto() COMMUNICATION_ERROR = enum.auto() MINUS_LIMIT_SWITCH = enum.auto() MOTOR_HAS_BEEN_HOMED = enum.auto() AT_SWITCH = MINUS_LIMIT_SWITCH | PLUS_LIMIT_SWITCH class PositionsNotReached(Exception): pass def assert_tweaker_positions(targets, timeout=60.0): """ wait for each of the given motors to have specified positions within tolerance :param targets: [ (motor, target_position, tolerance), ... ] :return: None :raises: PositionsNotReached """ num_motors = len(targets) timeisup = timeout + time.time() while time.time() < timeisup: count = 0 summary = [] for i, m in enumerate(targets): motor, target, tolerance = m name = motor.short_name pend_event() cur = motor.get_position() done = motor.is_done() logger.debug("check {}[done={}]: {} == {}".format(name, done, cur, target)) summary.append("{}[done={}]: {} == {}".format(name, done, cur, target)) if done and tolerance >= abs(cur - target): count += 1 pend_event(0.1) if count == num_motors: break pend_event(0.1) if count != num_motors: raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary))) def assert_motor_positions(targets, timeout=60.0): """ wait for each of the given motors to have specified positions within tolerance :param targets: [ (motor, target_position, tolerance), ... ] :return: None :raises: PositionsNotReached """ num_motors = len(targets) timeisup = timeout + time.time() while time.time() < timeisup: count = 0 summary = [] for i, m in enumerate(targets): motor, target, tolerance = m name = motor._prefix pend_event() cur = motor.get_position(readback=True) done = motor.done_moving logger.debug("check {}[done={}]: {} == {}".format(name, done, cur, target)) summary.append("{}[done={}]: {} == {}".format(name, done, cur, target)) if done and tolerance >= abs(cur - target): count += 1 pend_event(0.1) if count == num_motors: break pend_event() if count != num_motors: raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary)))