Files
SwissMX/app_utils.py

87 lines
2.1 KiB
Python

import time
import logging
_log=logging.getLogger(__name__)
from epics.ca import pend_event
class PositionsNotReached(Exception):
pass
def assert_tweaker_positions(targets, timeout=60.0):
"""
wait for each of the given motors to have specified positions within tolerance
:param targets: [
(motor, target_position, tolerance),
...
]
:return: None
:raises: PositionsNotReached
"""
num_motors=len(targets)
timeisup=timeout+time.time()
while time.time()<timeisup:
count=0
summary=[]
for i, m in enumerate(targets):
mot_tw, target, tolerance=m
name=mot_tw._motor._short_name
notSim=not type(mot_tw).__name__.startswith('Sim')
if notSim: pend_event()
cur=mot_tw.get_rbv()
done=mot_tw.is_done()
s=f"check {name} {cur:.5g} == {target:.5g} [done={done}]"
_log.debug(s)
summary.append(s)
if done and tolerance>=abs(cur-target):
count+=1
if notSim: pend_event(0.1)
if count==num_motors:
break
if notSim: pend_event(0.1)
if count!=num_motors:
raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary)))
def assert_motor_positions(targets, timeout=60.0):
"""
wait for each of the given motors to have specified positions within tolerance
:param targets: [
(motor, target_position, tolerance),
...
]
:return: None
:raises: PositionsNotReached
"""
num_motors=len(targets)
timeisup=timeout+time.time()
while time.time()<timeisup:
count=0
summary=[]
for i, m in enumerate(targets):
motor, target, tolerance=m
name=motor._prefix
pend_event()
cur=motor.get_rbv(readback=True)
done=motor.done_moving
_log.debug("check {}[done={}]: {} == {}".format(name, done, cur, target))
summary.append("{}[done={}]: {} == {}".format(name, done, cur, target))
if done and tolerance>=abs(cur-target):
count+=1
pend_event(0.1)
if count==num_motors:
break
pend_event()
if count!=num_motors:
raise PositionsNotReached("failed to reach target positions: {}".format("#".join(summary)))