Files
motorDriverTests/common.py
2025-07-22 14:13:01 +02:00

135 lines
3.7 KiB
Python

import time
import os
import pytest
import yaml
from caproto.sync.client import read, write
def read_config():
root_dir = os.path.dirname(__file__)
with open(root_dir + "/config.yaml", "r") as f:
return yaml.safe_load(f)
class MajorState(Exception):
pass
class Axis:
# Motor record fields
fields = {
'readpv': 'RBV',
'writepv': 'VAL',
'stop': 'STOP',
'donemoving': 'DMOV',
'moving': 'MOVN',
'miss': 'MISS',
'homeforward': 'HOMF',
'homereverse': 'HOMR',
'direction': 'DIR',
'speed': 'VELO',
'basespeed': 'VBAS',
'maxspeed': 'VMAX',
'offset': 'OFF',
'dialhighlimit': 'DHLM',
'diallowlimit': 'DLLM',
'highlimit': 'HLM',
'lowlimit': 'LLM',
'softlimit': 'LVIO',
'lowlimitswitch': 'LLS',
'highlimitswitch': 'HLS',
'resolution': 'MRES',
'enable': 'CNEN',
'set': 'SET',
'foff': 'FOFF',
'status': 'MSTA',
'alarm_status': 'STAT',
'alarm_severity': 'SEVR',
}
def __init__(self, ip, port, pv):
self.ip = ip
self.port = port
self.pv = pv
def write(self, suffix, value):
write(self.pv + suffix, value)
def write_field(self, fieldname, value):
self.write('.' + self.fields[fieldname], value)
def read(self, suffix):
return read(self.pv + suffix)
def read_field(self, fieldname):
return self.read('.' + self.fields[fieldname])
def move_and_wait(self, target):
"""
Move the motor to the given target and return, once the motor has
finished moving. The motor status is polled regulary to see if an error
occurred during movement. In case this happens, an error is raised.
"""
self.write(self.fields['writepv'], target)
# Give the record some time to start
time.sleep(1)
self.wait_for_done()
def limits(self):
return (self.read_field('lowlimit'), self.read_field('highlimit'))
def at_target(self, target=None):
"""
Check if the motor arrived at its target by checking the MISS PV and
comparing the writepv value with the readpv value (taking precision)
into account
"""
if target is None:
target = self.read_field('writepv')
return (self.read_field('miss') == 0 and
abs(self.read_field('readpv') - target) < self.read_field('precision'))
def wait_for_done(self):
while not self.read_field('donemoving'):
if self.has_error():
raise MajorState('Record is in MAJOR state!')
time.sleep(1)
def has_error(self):
self.read_field('alarm_severity') == 2
def is_homed(self):
# See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status
str = format(self.read_field('status'), '016b')
return bool(str[15])
class SinqMotor(Axis):
# PV suffixes used in SinqMotor drivers
suffixes = {
'enable': ':Enable',
'enable_rbv': ':EnableRBV',
'can_disable': ':CanDisable',
'connected_rbv': ':Connected',
'encoder_type': ':EncoderType',
'reseterrorpv': ':Reset',
'errormsgpv': '-MsgTxt',
}
def write_field(self, fieldname, value):
if fieldname in self.suffixes:
self.write(self.suffixes[fieldname], value)
self.write('.' + self.fields[fieldname], value)
def read_field(self, fieldname):
if fieldname in self.suffixes:
return self.read(self.suffixes[fieldname])
return self.read('.' + self.fields[fieldname])
class TurboPMAC(Axis):
pass