Files
motorDriverTests/common.py

223 lines
6.9 KiB
Python
Executable File

import time
import os
import math
import pytest
import yaml
from caproto.sync.client import read, write
from caproto import CaprotoTimeoutError, ChannelType
def read_config():
root_dir = os.path.dirname(__file__)
with open(root_dir + "/config.yaml", "r") as f:
return yaml.safe_load(f)
class MajorState(Exception):
pass
class Motor:
# Motor record fields
fields = {
'readpv': 'RBV',
'writepv': 'VAL',
'stop': 'STOP',
'donemoving': 'DMOV',
'moving': 'MOVN',
'miss': 'MISS',
'homeforward': 'HOMF',
'homereverse': 'HOMR',
'direction': 'DIR',
'speed': 'VELO',
'basespeed': 'VBAS',
'maxspeed': 'VMAX',
'offset': 'OFF',
'dialhighlimit': 'DHLM',
'diallowlimit': 'DLLM',
'highlimit': 'HLM',
'lowlimit': 'LLM',
'softlimit': 'LVIO',
'lowlimitswitch': 'LLS',
'highlimitswitch': 'HLS',
'resolution': 'MRES',
'enable': 'CNEN',
'set': 'SET',
'foff': 'FOFF',
'status': 'MSTA',
'alarm_status': 'STAT',
'alarm_severity': 'SEVR',
'precision': 'MRES',
}
def __init__(self, controller, name):
config = read_config()
self.ip = config['controllers'][controller]['ip']
self.port = config['controllers'][controller]['port']
self.busypoll = config['controllers'][controller]['busypoll']
self.idlepoll = config['controllers'][controller]['idlepoll']
pvprefix = config['pvprefix']
self.pv = f'{pvprefix}:{controller}:{name}'
def write(self, suffix, value):
write(self.pv + suffix, value)
def write_field(self, fieldname, value):
self.write('.' + self.fields[fieldname], value)
def read(self, suffix, as_string=False):
response = read(self.pv + suffix)
return self._convert_value(response, as_string)
def _convert_value(self, response, as_string=False):
# By default, the response data is always a list to cover all possible
# readback values from EPICS (lists, strings, chars, numbers). The last
# two cases need to be treated in a special way. They can be identified
# by the list length being 1.
if len(response.data) == 1:
value = response.data[0]
if isinstance(value, bytes):
return value.decode()
# If an empty string is returned, the data has a single entry
# (the NULL terminator)
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return value
# Strings are read with their NULL terminator, hence it needs to be
# stripped before decoding
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return response.data
def read_field(self, fieldname, as_string=False):
return self.read('.' + self.fields[fieldname], as_string)
def move_and_wait(self, target):
"""
Move the motor to the given target and return, once the motor has
finished moving. The motor status is polled regulary to see if an error
occurred during movement. In case this happens, an error is raised.
"""
# Is the motor already at its target?
if self.at_target(target):
return
# Is the motor already moving?
already_moving = self.read_field('donemoving') == 0
self.write_field('writepv', target)
if already_moving:
# Wait until the motor has stopped
self.wait_for_done()
self.wait_for_start()
self.wait_for_done()
def limits(self):
return (self.read_field('lowlimit'), self.read_field('highlimit'))
def at_target(self, target=None):
"""
Check if the motor arrived at its target by checking the MISS PV and
comparing the writepv value with the readpv value (taking precision)
into account
"""
if target is None:
target = self.read_field('writepv')
return (self.read_field('miss') == 0 and
math.isclose(self.read_field('readpv'), target,
rel_tol=1e-9, abs_tol=self.read_field('precision')))
def wait_for_start(self, maxwaittime=None):
if maxwaittime is None:
maxwaittime = 2 * self.idlepoll
now = time.time()
while self.read_field('donemoving') != 0:
if now + maxwaittime < time.time():
raise TimeoutError(
f'Motor did not start in {maxwaittime} seconds')
time.sleep(0.1)
def wait_for_done(self):
while not self.read_field('donemoving'):
time.sleep(0.1)
def has_error(self):
return self.read_field('alarm_severity') == 'MAJOR'
def is_homed(self):
# See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status
str = format(self.read_field('status'), '016b')
return bool(str[15])
class SinqMotor(Motor):
# PV suffixes used in SinqMotor drivers
suffixes = {
'enable': ':Enable',
'enable_rbv': ':EnableRBV',
'can_disable': ':CanDisable',
'connected_rbv': ':Connected',
'encoder_type': ':EncoderType',
'reseterrorpv': ':Reset',
'errormsgpv': '-MsgTxt',
}
def write_field(self, fieldname, value):
if fieldname in self.suffixes:
return self.write(self.suffixes[fieldname], value)
return self.write('.' + self.fields[fieldname], value)
def read_field(self, fieldname, as_string=False):
if fieldname in self.suffixes:
return self.read(self.suffixes[fieldname], as_string)
return self.read('.' + self.fields[fieldname], as_string)
def wait_enabled(self, timeout=10):
"""
Wait until the motor is enabled or a timeout has been reached.
"""
return self._wait_enabled_disabled(timeout, True)
def wait_disabled(self, timeout=10):
"""
Wait until the motor is enabled or a timeout has been reached.
"""
return self._wait_enabled_disabled(timeout, False)
def _wait_enabled_disabled(self, timeout, wait_for_enabling):
def check(enable_rbv):
if wait_for_enabling:
return enable_rbv == 0
return enable_rbv != 0
now = time.time()
while check(self.read_field('enable_rbv')):
if time.time() > now + timeout:
if wait_for_enabling:
pytest.fail(
f'Motor {self.pv} could not be enabled in {timeout} seconds')
else:
pytest.fail(
f'Motor {self.pv} could not be disabled in {timeout} seconds')
time.sleep(0.1)
class TurboPMAC(SinqMotor):
pass
class MasterMACS(SinqMotor):
pass