Files
motorDriverTests/src/classes.py
2025-12-05 15:49:52 +01:00

367 lines
12 KiB
Python
Executable File

import time
import os
import math
import logging
import sys
import yaml
from src.caproto import CaprotoWrapper
TIMEOUT = 5
def read_config():
root_dir = os.path.dirname(__file__)
with open(root_dir + "/../config.yaml", "r") as f:
return yaml.safe_load(f)
def setup_custom_logger(name):
"""
Formatting of the logger. Taken from:
https://stackoverflow.com/questions/28330317/print-timestamp-for-logging-in-python
"""
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
handler = logging.FileHandler('log.txt', mode='w')
handler.setFormatter(formatter)
screen_handler = logging.StreamHandler(stream=sys.stdout)
screen_handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.addHandler(screen_handler)
return logger
class MajorState(Exception):
pass
class StartTimeoutError(TimeoutError):
pass
class IocNotRunning(Exception):
pass
class Motor:
"""
This object represents a motor used for testing.
A few notes regarding EPICS interaction:
Although it would be feasible to simply use the `read` and `write` functions in `caproto.sync.client` to
access the motor PVs directly everytime, this approach is problematic because it opens and closes an UDP
socket everytime. In practice, this is known to be susceptible to timeout errors due to high volumes
of network traffic. Therefore, the NICOS `CaprotoWrapper` opens a socket once for each PV and keeps it
alive. The wrapper object is then attached to the motor via the field `_epics_wrapper` and used
to communicate with EPICS.
The PVs are derived from the member variable `fields`, which specifies a PV alias as a key
and the corresponding motor record field name (e.g. `'readpv': 'RBV'`). This field name is combined
with the motor PV prefix for the full field name. The motor PV prefix in turn is derived from the motor and
controller name given in the object constructor and the PV prefix defined in `config.yaml`.
For example, if the `pvprefix` in `config.yaml` is "DRVTESTS", the controller name is "turboPmac1" and the motor
name is "ax1", the full PV name for "readpv" is then:
`DRVTESTS:turboPmac1:ax1.RBV`
The full PV adress for each alias is created in the function `_get_pvs`. If additional PVs are needed, this
function therefore needs to be overwritten correspondingly in subclasses.
"""
__slots__ = ("_param_to_pv", "controller", "axis", "ip", "port",
"busypoll", "idlepoll", "pv", "default_position", 'logger',
"_epics_wrapper")
# Motor record fields
fields = {
'readpv': 'RBV',
'writepv': 'VAL',
'stop': 'STOP',
'donemoving': 'DMOV',
'moving': 'MOVN',
'miss': 'MISS',
'homeforward': 'HOMF',
'homereverse': 'HOMR',
'direction': 'DIR',
'speed': 'VELO',
'basespeed': 'VBAS',
'maxspeed': 'VMAX',
'offset': 'OFF',
'dialhighlimit': 'DHLM',
'diallowlimit': 'DLLM',
'highlimit': 'HLM',
'lowlimit': 'LLM',
'softlimit': 'LVIO',
'lowlimitswitch': 'LLS',
'highlimitswitch': 'HLS',
'resolution': 'MRES',
'enable': 'CNEN',
'set': 'SET',
'foff': 'FOFF',
'status': 'MSTA',
'alarm_status': 'STAT',
'alarm_severity': 'SEVR',
'precision': 'MRES',
}
def __init__(self, controller, axis, default_position=0):
self.controller = controller
self.axis = axis
config = read_config()
self.ip = config['controllers'][controller]['ip']
self.port = config['controllers'][controller]['port']
self.busypoll = config['controllers'][controller]['busypoll']
self.idlepoll = config['controllers'][controller]['idlepoll']
pvprefix = config['pvprefix']
self.pv = f'{pvprefix}:{controller}:{axis}'
self.default_position = default_position
self._epics_wrapper = CaprotoWrapper(TIMEOUT)
# This will store PV objects for each PV param and is populated by the init function
self._param_to_pv = {}
for (access_name, pvname) in self._get_pvs().items():
self._epics_wrapper.connect_pv(pvname)
self._param_to_pv[access_name] = pvname
# Set up for all "motor" devices.
self.logger = setup_custom_logger(f"motor.{controller}.{axis}")
def _get_pvs(self, pvs={}):
"""
:return: Dict of PV aliases and actual PV names.
"""
for (alias, pvname) in self.fields.items():
pvs[alias] = '.'.join((self.pv, pvname))
return pvs
def put_pv(self, pvparam, value, timeout=None):
self.logger.debug(
f"writing '{value}' to '{self._param_to_pv[pvparam]}'")
self._epics_wrapper.put_pv_value(
self._param_to_pv[pvparam], value, timeout=timeout)
if timeout is not None:
self.logger.debug(
f"finished blocking writing '{value}' to '{self._param_to_pv[pvparam]}'")
def get_pv(self, pvparam, as_string=False):
self.logger.debug(f"reading '{self._param_to_pv[pvparam]}'")
val = self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam],
as_string=as_string)
self.logger.debug(f"read '{val}' from '{self._param_to_pv[pvparam]}'")
return val
def move(self, target):
# Set the new target
self.put_pv('writepv', target)
if self.at_target(target):
self.logger.info(f"already at target '{target}'")
else:
self.logger.info(f"moving to '{target}'")
def move_and_wait(self, target):
"""
Move the motor to the given target and return, once the motor has
finished moving. The motor status is polled regulary to see if an error
occurred during movement. In case this happens, an error is raised.
"""
# Set the new target
self.put_pv('writepv', target)
# Is the motor already moving?
if self.get_pv('donemoving') != 0:
self.logger.info(f"starting move to '{target}'")
self.wait_for_start()
if self.at_target(target):
self.logger.info(f"already at target '{target}'")
else:
self.logger.info(f"moving to '{target}'")
self.wait_for_done()
self.logger.info(
f"finished moving to '{target}', now at '{self.get_pv('readpv')}'")
def limits(self):
return (self.get_pv('lowlimit'), self.get_pv('highlimit'))
def at_target(self, target=None):
"""
Check if the motor arrived at its target by checking the MISS PV and
comparing the writepv value with the readpv value (taking precision)
into account
"""
if target is None:
target = self.get_pv('writepv')
return (self.get_pv('miss') == 0 and
math.isclose(self.get_pv('readpv'), target,
rel_tol=1e-9, abs_tol=self.get_pv('precision')))
def wait_for_start(self, maxwaittime=None):
if maxwaittime is None:
maxwaittime = 5 * self.idlepoll
now = time.time()
while self.get_pv('donemoving') != 0:
if now + maxwaittime < time.time():
raise StartTimeoutError(
f'Motor did not start in {maxwaittime} seconds')
if self.has_error():
error_msg = self.error_message()
if error_msg:
raise MajorState(
f'Error while waiting for start of movement: {error_msg}')
else:
raise MajorState(
'Unknown error while waiting for start of movement')
time.sleep(0.1)
def wait_for_done(self):
while not self.get_pv('donemoving'):
if self.has_error():
error_msg = self.error_message()
if error_msg:
raise MajorState(
f'Error while waiting for completion of movement: {error_msg}')
else:
raise MajorState(
'Unknown error while waiting for completion of movement')
time.sleep(0.1)
def has_error(self):
return self.get_pv('alarm_severity') == 'MAJOR'
def error_message(self):
return ''
def is_homed(self):
# See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status
str = format(int(self.get_pv('status')), '016b')
return bool(str[15])
def stop(self):
self.logger.info(f"stopping")
self.put_pv('stop', 1)
def homeforward(self):
self.logger.info(f"homing forward")
self.put_pv('homeforward', 1)
def homereverse(self):
self.logger.info(f"homing reverse")
self.put_pv('homereverse', 1)
def set_highlimit(self, high_limit):
self.logger.info(f"setting high limit {high_limit}")
self.put_pv('dialhighlimit', high_limit)
def set_lowlimit(self, low_limit):
self.logger.info(f"setting low limit {low_limit}")
self.put_pv('diallowlimit', low_limit)
def set_speed(self, speed):
self.logger.info(f"setting speed {speed}")
self.put_pv('speed', speed)
class SinqMotor(Motor):
# PV suffixes used in SinqMotor drivers
suffixes = {
'enable': ':Enable',
'enable_rbv': ':EnableRBV',
'can_disable': ':CanDisable',
'connected_rbv': ':Connected',
'encoder_type': ':EncoderType',
'can_set_speed': ':CanSetSpeed',
'reseterrorpv': ':Reset',
'errormsgpv': '-MsgTxt',
}
def _get_pvs(self, pvs={}):
"""
:return: Dict of PV aliases and actual PV names.
"""
# Add the PVs of the base class first, because we overwrite the "enable" alias
pvs = Motor._get_pvs(self, pvs)
for (alias, pvname) in self.suffixes.items():
pvs[alias] = self.pv + pvname
return pvs
def error_message(self):
return self.get_pv('errormsgpv', as_string=True)
def enable_and_wait(self, timeout=20):
self.enable(1)
self.wait_enabled(timeout)
def disable_and_wait(self, timeout=20):
self.enable(0)
self.wait_disabled(timeout)
def enable(self, val):
if val == 0:
self.logger.info(f"disabling")
else:
self.logger.info(f"enabling")
self.put_pv('enable', 1)
def reset(self):
self.logger.info(f"resetting error")
self.put_pv('reseterrorpv', 1)
def wait_enabled(self, timeout=10):
"""
Wait until the motor is enabled or a timeout has been reached.
"""
return self._wait_enabled_disabled(True, timeout)
def wait_disabled(self, timeout=20):
"""
Wait until the motor is enabled or a timeout has been reached.
"""
return self._wait_enabled_disabled(False, timeout)
def _wait_enabled_disabled(self, wait_for_enabling, timeout):
def check(enable_rbv):
if wait_for_enabling:
return enable_rbv == 0
return enable_rbv != 0
now = time.time()
while check(self.get_pv('enable_rbv')):
if time.time() > now + timeout:
if wait_for_enabling:
raise MajorState(
f'could not be enabled in {timeout} seconds')
else:
raise MajorState(
f'could not be enabled in {timeout} seconds')
time.sleep(0.5)
if wait_for_enabling:
self.logger.info(f"enabled")
else:
self.logger.info(f"disabled")
class TurboPMAC(SinqMotor):
pass
class MasterMACS(SinqMotor):
pass
class EL734(SinqMotor):
pass