87 lines
2.1 KiB
Python
87 lines
2.1 KiB
Python
import time
|
|
|
|
import logging
|
|
|
|
_log=logging.getLogger(__name__)
|
|
|
|
from epics.ca import pend_event
|
|
|
|
class PositionsNotReached(Exception):
|
|
pass
|
|
|
|
|
|
def assert_tweaker_positions(targets, timeout=60.0):
|
|
"""
|
|
wait for each of the given motors to have specified positions within tolerance
|
|
:param targets: [
|
|
(motor, target_position, tolerance),
|
|
...
|
|
]
|
|
:return: None
|
|
:raises: PositionsNotReached
|
|
"""
|
|
num_motors=len(targets)
|
|
|
|
timeisup=timeout+time.time()
|
|
|
|
while time.time()<timeisup:
|
|
count=0
|
|
summary=[]
|
|
for i, m in enumerate(targets):
|
|
mot_tw, target, tolerance=m
|
|
label=mot_tw._label
|
|
notSim=not type(mot_tw).__name__.startswith('Sim')
|
|
|
|
if notSim: pend_event()
|
|
cur=mot_tw.get_rbv()
|
|
done=mot_tw.is_done()
|
|
s=f"check {label} {cur:.5g} == {target:.5g} [done={done}]"
|
|
_log.debug(s)
|
|
summary.append(s)
|
|
if done and tolerance>=abs(cur-target):
|
|
count+=1
|
|
if notSim: pend_event(0.1)
|
|
if count==num_motors:
|
|
break
|
|
if notSim: pend_event(0.1)
|
|
|
|
if count!=num_motors:
|
|
raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary)))
|
|
|
|
|
|
def assert_motor_positions(targets, timeout=60.0):
|
|
"""
|
|
wait for each of the given motors to have specified positions within tolerance
|
|
:param targets: [
|
|
(motor, target_position, tolerance),
|
|
...
|
|
]
|
|
:return: None
|
|
:raises: PositionsNotReached
|
|
"""
|
|
num_motors=len(targets)
|
|
|
|
timeisup=timeout+time.time()
|
|
|
|
while time.time()<timeisup:
|
|
count=0
|
|
summary=[]
|
|
for i, m in enumerate(targets):
|
|
motor, target, tolerance=m
|
|
name=motor._prefix
|
|
pend_event()
|
|
cur=motor.get_rbv(readback=True)
|
|
done=motor.done_moving
|
|
_log.debug("check {}[done={}]: {} == {}".format(name, done, cur, target))
|
|
summary.append("{}[done={}]: {} == {}".format(name, done, cur, target))
|
|
if done and tolerance>=abs(cur-target):
|
|
count+=1
|
|
pend_event(0.1)
|
|
|
|
if count==num_motors:
|
|
break
|
|
pend_event()
|
|
|
|
if count!=num_motors:
|
|
raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary)))
|