import time import os import math import pytest import yaml from setup.caproto import CaprotoWrapper TIMEOUT = 5 def read_config(): root_dir = os.path.dirname(__file__) with open(root_dir + "/../config.yaml", "r") as f: return yaml.safe_load(f) class MajorState(Exception): pass class StartTimeoutError(TimeoutError): pass class IocNotRunning(Exception): pass class Motor: """ This object represents a motor used for testing. A few notes regarding EPICS interaction: Although it would be feasible to simply use the `read` and `write` functions in `caproto.sync.client` to access the motor PVs directly everytime, this approach is problematic because it opens and closes an UDP socket everytime. In practice, this is known to be susceptible to timeout errors due to high volumes of network traffic. Therefore, the NICOS `CaprotoWrapper` opens a socket once for each PV and keeps it alive. The wrapper object is then attached to the motor via the field `_epics_wrapper` and used to communicate with EPICS. The PVs are derived from the member variable `fields`, which specifies a PV alias as a key and the corresponding motor record field name (e.g. `'readpv': 'RBV'`). This field name is combined with the motor PV prefix for the full field name. The motor PV prefix in turn is derived from the motor and controller name given in the object constructor and the PV prefix defined in `config.yaml`. For example, if the `pvprefix` in `config.yaml` is "DRVTESTS", the controller name is "turboPmac1" and the motor name is "ax1", the full PV name for "readpv" is then: `DRVTESTS:turboPmac1:ax1.RBV` The full PV adress for each alias is created in the function `_get_pvs`. If additional PVs are needed, this function therefore needs to be overwritten correspondingly in subclasses. """ # This will store PV objects for each PV param and is populated by the init function _param_to_pv = {} # Motor record fields fields = { 'readpv': 'RBV', 'writepv': 'VAL', 'stop': 'STOP', 'donemoving': 'DMOV', 'moving': 'MOVN', 'miss': 'MISS', 'homeforward': 'HOMF', 'homereverse': 'HOMR', 'direction': 'DIR', 'speed': 'VELO', 'basespeed': 'VBAS', 'maxspeed': 'VMAX', 'offset': 'OFF', 'dialhighlimit': 'DHLM', 'diallowlimit': 'DLLM', 'highlimit': 'HLM', 'lowlimit': 'LLM', 'softlimit': 'LVIO', 'lowlimitswitch': 'LLS', 'highlimitswitch': 'HLS', 'resolution': 'MRES', 'enable': 'CNEN', 'set': 'SET', 'foff': 'FOFF', 'status': 'MSTA', 'alarm_status': 'STAT', 'alarm_severity': 'SEVR', 'precision': 'MRES', } def __init__(self, controller, name, default_position=0): config = read_config() self.ip = config['controllers'][controller]['ip'] self.port = config['controllers'][controller]['port'] self.busypoll = config['controllers'][controller]['busypoll'] self.idlepoll = config['controllers'][controller]['idlepoll'] pvprefix = config['pvprefix'] self.pv = f'{pvprefix}:{controller}:{name}' self.default_position = default_position self._epics_wrapper = CaprotoWrapper(TIMEOUT) for (access_name, pvname) in self._get_pvs().items(): self._epics_wrapper.connect_pv(pvname) self._param_to_pv[access_name] = pvname def _get_pvs(self, pvs ={}): """ :return: Dict of PV aliases and actual PV names. """ for (alias, pvname) in self.fields.items(): pvs[alias] = '.'.join((self.pv, pvname)) return pvs def put_pv(self, pvparam, value, wait=False): self._epics_wrapper.put_pv_value(self._param_to_pv[pvparam], value, wait=wait) def _convert_value(self, response, as_string=False): # By default, the response data is always a list to cover all possible # readback values from EPICS (lists, strings, chars, numbers). The last # two cases need to be treated in a special way. They can be identified # by the list length being 1. if len(response.data) == 1: value = response.data[0] if isinstance(value, bytes): return value.decode() # If an empty string is returned, the data has a single entry # (the NULL terminator) if as_string: return bytes(response.data).rstrip(b'\x00').decode( encoding='utf-8', errors='ignore') return value # Strings are read with their NULL terminator, hence it needs to be # stripped before decoding if as_string: return bytes(response.data).rstrip(b'\x00').decode( encoding='utf-8', errors='ignore') return response.data def get_pv(self, pvparam, as_string=False): return self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam], as_string=as_string) def move_and_wait(self, target): """ Move the motor to the given target and return, once the motor has finished moving. The motor status is polled regulary to see if an error occurred during movement. In case this happens, an error is raised. """ # Is the motor already at its target? if self.at_target(target): return # Is the motor already moving? already_moving = self.get_pv('donemoving') == 0 self.put_pv('writepv', target) if not already_moving: self.wait_for_start() self.wait_for_done() def limits(self): return (self.get_pv('lowlimit'), self.get_pv('highlimit')) def at_target(self, target=None): """ Check if the motor arrived at its target by checking the MISS PV and comparing the writepv value with the readpv value (taking precision) into account """ if target is None: target = self.get_pv('writepv') return (self.get_pv('miss') == 0 and math.isclose(self.get_pv('readpv'), target, rel_tol=1e-9, abs_tol=self.get_pv('precision'))) def wait_for_start(self, maxwaittime=None): if maxwaittime is None: maxwaittime = 5 * self.idlepoll now = time.time() while self.get_pv('donemoving') != 0: if now + maxwaittime < time.time(): raise StartTimeoutError( f'Motor did not start in {maxwaittime} seconds') if self.has_error(): error_msg = self.error_message() if error_msg: raise MajorState(f'Error while waiting for start of movement: {error_msg}') else: raise MajorState('Unknown error while waiting for start of movement') time.sleep(0.1) def wait_for_done(self): while not self.get_pv('donemoving'): if self.has_error(): error_msg = self.error_message() if error_msg: raise MajorState(f'Error while waiting for completion of movement: {error_msg}') else: raise MajorState('Unknown error while waiting for completion of movement') time.sleep(0.1) def has_error(self): return self.get_pv('alarm_severity') == 'MAJOR' def error_message(self): return '' def is_homed(self): # See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status str = format(int(self.get_pv('status')), '016b') return bool(str[15]) class SinqMotor(Motor): # PV suffixes used in SinqMotor drivers suffixes = { 'enable': ':Enable', 'enable_rbv': ':EnableRBV', 'can_disable': ':CanDisable', 'connected_rbv': ':Connected', 'encoder_type': ':EncoderType', 'can_set_speed': ':CanSetSpeed', 'reseterrorpv': ':Reset', 'errormsgpv': '-MsgTxt', } def _get_pvs(self, pvs ={} ): """ :return: Dict of PV aliases and actual PV names. """ # Add the PVs of the base class first, because we overwrite the "enable" alias pvs = Motor._get_pvs(self, pvs) for (alias, pvname) in self.suffixes.items(): pvs[alias] = self.pv + pvname return pvs def error_message(self): return self.get_pv('errormsgpv', as_string=True) def enable_and_wait(self, timeout=20): self.put_pv('enable', 1) self.wait_enabled(timeout) def disable_and_wait(self, timeout=20): self.put_pv('enable', 0) self.wait_disabled(timeout) def wait_enabled(self, timeout=10): """ Wait until the motor is enabled or a timeout has been reached. """ return self._wait_enabled_disabled(True, timeout) def wait_disabled(self, timeout=20): """ Wait until the motor is enabled or a timeout has been reached. """ return self._wait_enabled_disabled(False, timeout) def _wait_enabled_disabled(self, wait_for_enabling, timeout): def check(enable_rbv): if wait_for_enabling: return enable_rbv == 0 return enable_rbv != 0 now = time.time() while check(self.get_pv('enable_rbv')): if time.time() > now + timeout: if wait_for_enabling: pytest.fail( f'Motor {self.pv} could not be enabled in {timeout} seconds') else: pytest.fail( f'Motor {self.pv} could not be disabled in {timeout} seconds') time.sleep(0.5) class TurboPMAC(SinqMotor): pass class MasterMACS(SinqMotor): pass