From ed03816888e9c903f4f588b724d3e8e3f0645633 Mon Sep 17 00:00:00 2001 From: Stefan Mathis Date: Mon, 28 Jul 2025 16:43:53 +0200 Subject: [PATCH] Added homing test and switched to NICOS-like caproto backend implementaiton --- README.md | 24 +- example_custom_script.py | 6 +- ioc/{startioc.py => startioc} | 1 - maketestenv | 3 +- pytestpar.py => runtests | 32 +- setup/__init__.py | 0 setup/caproto.py | 308 ++++++++++++++++++ test_helpers.py => setup/classes.py | 117 ++++--- tests/conftest.py | 18 +- tests/home.py | 92 ++++++ tests/move.py | 20 +- tests/sinqMotor/home.py | 41 --- tests/sinqMotor/limits.py | 14 +- tests/sinqMotor/masterMacs/ax1/conftest.py | 2 +- tests/sinqMotor/masterMacs/ax1/test_common.py | 17 + tests/sinqMotor/masterMacs/reset.py | 6 +- tests/sinqMotor/speed.py | 44 +-- tests/sinqMotor/turboPmac/ax1/conftest.py | 8 +- tests/sinqMotor/turboPmac/ax1/test_common.py | 34 +- tests/sinqMotor/turboPmac/ax5/conftest.py | 2 +- tests/sinqMotor/turboPmac/ax5/test_common.py | 15 +- tests/sinqMotor/turboPmac/reset.py | 4 +- 22 files changed, 646 insertions(+), 162 deletions(-) rename ioc/{startioc.py => startioc} (99%) rename pytestpar.py => runtests (64%) create mode 100644 setup/__init__.py create mode 100644 setup/caproto.py rename test_helpers.py => setup/classes.py (63%) create mode 100755 tests/home.py delete mode 100755 tests/sinqMotor/home.py diff --git a/README.md b/README.md index ff13d2d..8097d10 100755 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ identifier of the records created by the motor drivers. ## Starting the IOC -It is recommended to start the IOC via `ioc/startioc.py`: +It is recommended to start the IOC via `ioc/startioc`: ```console ioc/startioc.py @@ -81,15 +81,17 @@ pytest -s tests/sinqMotor/turboPmac/lin1/test_common.py -k 'test_something' Tests which don't run on the same motor can be parallelized in order to both save on runtime and to test controller and driver performance while handling multiple motors. To do so, -the wrapper script `pytestpar.py` is provided which starts a pytest process for each motor. +the wrapper script `runtests` is provided which starts a pytest process for each motor. The individual tests for each motor are run sequentially. -`pytestpar.py` accepts any arguments and forwards them to the pytest call. +`runtests` accepts any arguments and forwards them to the pytest call. If a folder is given as an argument, all tests which are not within this folder are ignored. -- `pytestpar` will run all tests parallelized per motor -- `pytestpar tests/sinqMotor/turboPmac` will run all Turbo PMAC tests parallelized per motor -- `pytestpar tests/sinqMotor/turboPmac/ax1` will only run the tests for motor `ax1`. These tests are run sequentially. +- `runtests` will run all tests parallelized per motor +- `runtests tests/sinqMotor/turboPmac` will run all Turbo PMAC tests parallelized per motor +- `runtests tests/sinqMotor/turboPmac/ax1` will only run the tests for motor `ax1`. These tests are run sequentially. + +In addition to the parallelization feature, `runtests` also starts the IOC if it is not running already. ## Running custom scripts @@ -100,4 +102,12 @@ the virtual environment and then simply executing it: bash source testenv/bin/activate ./example_custom_script.py -``` \ No newline at end of file +``` +## Developer notes + +### EPICS integration + +The EPICS integration is currently done via caproto. This module is taken from NICOS (https://github.com/mlz-ictrl/nicos/blob/master/nicos/devices/epics/pva/caproto.py) +and has been slightly modified (mainly removal of NICOS-specific decorators and the conversion of the EPICS to the NICOS status). The docstring of the `setup.classes.Motor` +class contains further information. + diff --git a/example_custom_script.py b/example_custom_script.py index 46ded70..3722083 100755 --- a/example_custom_script.py +++ b/example_custom_script.py @@ -6,7 +6,7 @@ import time -from test_helpers import TurboPMAC +from setup.classes import TurboPMAC motor = TurboPMAC('turboPmac1', 'lin1') @@ -17,10 +17,10 @@ motor.move_and_wait(-20) motor.move_and_wait(-30) # Disable the motor -motor.write_field('enable', 0) +motor.put_pv('enable', 0) # Wait a bit so the motor has been disabled time.sleep(2) # Reenable the motor -motor.write_field('enable', 1) +motor.put_pv('enable', 1) diff --git a/ioc/startioc.py b/ioc/startioc similarity index 99% rename from ioc/startioc.py rename to ioc/startioc index b4a7280..d37d03c 100755 --- a/ioc/startioc.py +++ b/ioc/startioc @@ -10,7 +10,6 @@ import subprocess import sys from pathlib import Path - def startioc(): root_dir = Path(__file__).resolve().parent.parent diff --git a/maketestenv b/maketestenv index c9c4e87..e0602eb 100755 --- a/maketestenv +++ b/maketestenv @@ -9,4 +9,5 @@ source testenv/bin/activate pip install 'caproto' pip install 'pytest' -pip install 'pyyaml' \ No newline at end of file +pip install 'pyyaml' +pip install 'numpy' \ No newline at end of file diff --git a/pytestpar.py b/runtests similarity index 64% rename from pytestpar.py rename to runtests index 8ff5285..8a5539d 100755 --- a/pytestpar.py +++ b/runtests @@ -1,6 +1,8 @@ #!/usr/bin/env python3 -# This script is a wrapper around pytest which starts a separate pytest process +# This script is a wrapper around pytest which performs the following tasks: +# 1) If the IOC in ioc/startioc.py is not running, start it +# 2) Run tests in parallel by starting a separate pytest process # for each separate motor given in FOLDER. This allows to run tests on different # motors in parallel, cutting down execution time and testing parallelism. # The script accepts any arguments and forwards them to the pytest call. @@ -15,10 +17,17 @@ import sys import subprocess import os from pathlib import Path +import time + +from tests.conftest import check_ioc_running +from setup.classes import IocNotRunning # Define the test folders you want to run in parallel FOLDERS = ["tests/sinqMotor/turboPmac/ax1", "tests/sinqMotor/turboPmac/ax5"] +# Time we excpect the IOC needs to start up +TIMEOUT_IOC_STARTUP = 10 + # Separate path-like arguments from other pytest args path_args = [] extra_args = [] @@ -38,6 +47,26 @@ else: if any(Path(arg).resolve().as_posix().startswith(Path(folder).resolve().as_posix()) for arg in path_args): enabled_folders.append(folder) +# Check if the IOC is running and try to start it, if it isn't +try: + check_ioc_running() +except IocNotRunning: + print('Starting IOC ...') + # Start the IOC as a separate process, wait a bit and try again + path = Path(__file__).resolve().parent / 'ioc' / 'startioc' + p = subprocess.Popen([path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + now = time.time() + while now + TIMEOUT_IOC_STARTUP > time.time(): + time.sleep(0.5) + try: + check_ioc_running() + except IocNotRunning: + pass + check_ioc_running() + print(f'Started IOC successfully (process ID {p.pid})! It will automatically terminate after all tests are done.') + # Run each enabled folder's relevant tests in parallel processes = [] for folder in enabled_folders: @@ -58,5 +87,4 @@ for proc in processes: code = proc.wait() if code != 0: exit_code = code - sys.exit(exit_code) \ No newline at end of file diff --git a/setup/__init__.py b/setup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/setup/caproto.py b/setup/caproto.py new file mode 100644 index 0000000..2f30f73 --- /dev/null +++ b/setup/caproto.py @@ -0,0 +1,308 @@ +# ***************************************************************************** +# NICOS, the Networked Instrument Control System of the MLZ +# Copyright (c) 2009-2025 by the NICOS contributors (see AUTHORS) +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Matt Clarke +# Stefan Mathis +# +# ***************************************************************************** +# +# This file has been taken from NICOS, the original file can be found in: +# https://github.com/mlz-ictrl/nicos/blob/master/nicos/devices/epics/pva/caproto.py + + +from functools import partial + +import numpy as np +from caproto import CaprotoTimeoutError, ChannelType +from caproto.sync.client import read, write +from caproto.threading.client import Context + +FTYPE_TO_TYPE = { + ChannelType.STRING: str, + ChannelType.INT: int, + ChannelType.FLOAT: float, + ChannelType.ENUM: int, + ChannelType.CHAR: bytes, + ChannelType.LONG: int, + ChannelType.DOUBLE: float, + + ChannelType.TIME_STRING: str, + ChannelType.TIME_INT: int, + ChannelType.TIME_FLOAT: float, + ChannelType.TIME_ENUM: int, + ChannelType.TIME_CHAR: bytes, + ChannelType.TIME_LONG: int, + ChannelType.TIME_DOUBLE: float, + + ChannelType.CTRL_STRING: str, + ChannelType.CTRL_INT: int, + ChannelType.CTRL_FLOAT: float, + ChannelType.CTRL_ENUM: int, + ChannelType.CTRL_CHAR: bytes, + ChannelType.CTRL_LONG: int, + ChannelType.CTRL_DOUBLE: float, +} + +STATUS_TO_MESSAGE = { + 0: 'NO_ALARM', + 1: 'READ', + 2: 'WRITE', + 3: 'HIHI', + 4: 'HIGH', + 5: 'LOLO', + 6: 'LOW', + 7: 'STATE', + 8: 'COS', + 9: 'COMM', + 10: 'TIMED', + 11: 'HWLIMIT', + 12: 'CALC', + 13: 'SCAN', + 14: 'LINK', + 15: 'SOFT', + 16: 'BAD_SUB', + 17: 'UDF', + 18: 'DISABLE', + 19: 'SIMM', + 20: 'READ_ACCESS', + 21: 'WRITE_ACCESS', +} + +# Same context can be shared across all devices. +_Context = Context() + + +def caget(name, timeout=3.0): + """ Returns the PV's current value in its raw form via CA. + + :param name: the PV name + :param timeout: the EPICS timeout + :return: the PV's raw value + """ + response = read(name, timeout=timeout) + return response.data[0] if len(response.data) == 1 else response.data + + +def caput(name, value, wait=False, timeout=3.0): + """ Sets a PV's value via CA. + + :param name: the PV name + :param value: the value to set + :param wait: whether to wait for completion + :param timeout: the EPICS timeout + """ + write(name, value, notify=wait, timeout=timeout) + + +class CaprotoWrapper: + """ Class that wraps the caproto module that provides EPICS Channel Access + (CA) support. + """ + + def __init__(self, timeout=3.0): + self._pvs = {} + self._choices = {} + self._callbacks = set() + self._timeout = timeout + + def connect_pv(self, pvname): + if pvname in self._pvs: + return + + value = self._create_pv(pvname) + # Do some prep work for enum types + if hasattr(value.metadata, 'enum_strings'): + self._choices[pvname] = self.get_value_choices(pvname) + + def _create_pv(self, pvname, connection_callback=None): + try: + pv, *_ = _Context.get_pvs( + pvname, + connection_state_callback=connection_callback, + timeout=self._timeout + ) + self._pvs[pvname] = pv + # Do a read to force a connection + return pv.read(timeout=self._timeout, data_type='control') + except CaprotoTimeoutError: + raise TimeoutError( + f'could not connect to PV {pvname}') from None + + def get_pv_value(self, pvname, as_string=False): + try: + response = self._pvs[pvname].read(timeout=self._timeout, + data_type='control') + return self._convert_value(pvname, response, as_string) + except CaprotoTimeoutError: + raise TimeoutError(f'getting {pvname} timed out') from None + + def _convert_value(self, pvname, response, as_string=False): + + # By default, the response data is always a list to cover all possible + # readback values from EPICS (lists, strings, chars, numbers). The last + # two cases need to be treated in a special way. They can be identified + # by the list length being 1. + if len(response.data) == 1: + value = response.data[0] + if pvname in self._choices: + return self._choices[pvname][value] if as_string else value + elif isinstance(value, bytes): + return value.decode() + + # If an empty string is returned, the data has a single entry + # (the NULL terminator) + if as_string: + return bytes(response.data).rstrip(b'\x00').decode( + encoding='utf-8', errors='ignore') + + return value + + # Strings are read with their NULL terminator, hence it needs to be + # stripped before decoding + if as_string: + return bytes(response.data).rstrip(b'\x00').decode( + encoding='utf-8', errors='ignore') + + if isinstance(response.data, np.ndarray): + val_type = FTYPE_TO_TYPE[self._pvs[pvname].channel.native_data_type] + if val_type == bytes or as_string: + return response.data.tobytes().decode() + + return response.data + + def put_pv_value(self, pvname, value, wait=False): + if pvname in self._choices: + value = self._choices[pvname].index(value) + try: + self._pvs[pvname].write(value, wait=wait, timeout=self._timeout) + except CaprotoTimeoutError: + raise TimeoutError(f'setting {pvname} timed out') from None + + def put_pv_value_blocking(self, pvname, value, block_timeout=60): + if pvname in self._choices: + value = self._choices[pvname].index(value) + try: + self._pvs[pvname].write(value, wait=True, timeout=block_timeout) + except CaprotoTimeoutError: + raise TimeoutError(f'setting {pvname} timed out') from None + + def get_pv_type(self, pvname): + data_type = self._pvs[pvname].channel.native_data_type + return FTYPE_TO_TYPE.get(data_type) + + def get_alarm_status(self, pvname): + values = self.get_control_values(pvname) + return self._extract_alarm_info(values) + + def get_units(self, pvname, default=''): + values = self.get_control_values(pvname) + return self._get_units(values, default) + + def _get_units(self, values, default): + if hasattr(values, 'units'): + return values.units.decode() + return default + + def get_limits(self, pvname, default_low=-1e308, default_high=1e308): + values = self.get_control_values(pvname) + if hasattr(values, 'lower_ctrl_limit'): + default_low = values.lower_ctrl_limit + default_high = values.upper_ctrl_limit + return default_low, default_high + + def get_control_values(self, pvname): + try: + result = self._pvs[pvname].read(timeout=self._timeout, + data_type='control') + return result.metadata + except CaprotoTimeoutError: + raise TimeoutError( + f'getting control values for {pvname} timed out') from None + + def get_value_choices(self, pvname): + # Only works for enum types like MBBI and MBBO + value = self.get_control_values(pvname) + return self._extract_choices(value) + + def _extract_choices(self, value): + if hasattr(value, 'enum_strings'): + return [x.decode() for x in value.enum_strings] + return [] + + def subscribe(self, pvname, pvparam, change_callback, + connection_callback=None, as_string=False): + """ + Create a monitor subscription to the specified PV. + + :param pvname: The PV name. + :param pvparam: The associated NICOS parameter + (e.g. readpv, writepv, etc.). + :param change_callback: The function to call when the value changes. + :param connection_callback: The function to call when the connection + status changes. + :param as_string: Whether to return the value as a string. + :return: the subscription object. + """ + conn_callback = self._create_connection_callback(pvname, pvparam, + connection_callback) + self._create_pv(pvname, conn_callback) + + value_callback = self._create_value_callback(pvname, pvparam, + change_callback, as_string) + sub = self._pvs[pvname].subscribe(data_type='control') + sub.add_callback(value_callback) + return sub + + def _create_value_callback(self, pvname, pvparam, change_callback, + as_string): + callback = partial(self._callback, pvname, pvparam, change_callback, + as_string) + self._store_callback(callback) + return callback + + def _create_connection_callback(self, pvname, pvparam, connection_callback): + callback = partial(self._conn_callback, pvname, pvparam, + connection_callback) + self._store_callback(callback) + return callback + + def _store_callback(self, callback): + # Must keep a reference to callbacks to avoid garbage collection! + self._callbacks.add(callback) + + def _callback(self, pvname, pvparam, change_callback, as_string, sub, + response): + value = self._convert_value(pvname, response, as_string) + units = self._get_units(response.metadata, '') + severity, message = self._extract_alarm_info(response) + change_callback(pvname, pvparam, value, units, severity, message) + + def _conn_callback(self, pvname, pvparam, connection_callback, pv, state): + connection_callback(pvname, pvparam, state == 'connected') + + def _extract_alarm_info(self, values): + # The EPICS 'severity' matches to the NICOS `status` and the message has + # a short description of the alarm details. + if hasattr(values, 'severity'): + message = STATUS_TO_MESSAGE[values.status] + return values.severity, '' if message == 'NO_ALARM' else message + return values.severity, 'alarm information unavailable' + + def close_subscription(self, subscription): + subscription.clear() diff --git a/test_helpers.py b/setup/classes.py similarity index 63% rename from test_helpers.py rename to setup/classes.py index ce89ae6..da0c8b8 100755 --- a/test_helpers.py +++ b/setup/classes.py @@ -4,16 +4,13 @@ import math import pytest import yaml -from caproto.sync.client import read, write - -# We use a very high timeout here in order to make sure the tests do not fail prematurely. -# The default 1 second has proven to be too strict. -CAPROTO_TIMEOUT = 5 +from setup.caproto import CaprotoWrapper +TIMEOUT = 5 def read_config(): root_dir = os.path.dirname(__file__) - with open(root_dir + "/config.yaml", "r") as f: + with open(root_dir + "/../config.yaml", "r") as f: return yaml.safe_load(f) @@ -23,7 +20,36 @@ class MajorState(Exception): class StartTimeoutError(TimeoutError): pass +class IocNotRunning(Exception): + pass + class Motor: + """ + This object represents a motor used for testing. + + A few notes regarding EPICS interaction: + + Although it would be feasible to simply use the `read` and `write` functions in `caproto.sync.client` to + access the motor PVs directly everytime, this approach is problematic because it opens and closes an UDP + socket everytime. In practice, this is known to be susceptible to timeout errors due to high volumes + of network traffic. Therefore, the NICOS `CaprotoWrapper` opens a socket once for each PV and keeps it + alive. The wrapper object is then attached to the motor via the field `_epics_wrapper` and used + to communicate with EPICS. + + The PVs are derived from the member variable `fields`, which specifies a PV alias as a key + and the corresponding motor record field name (e.g. `'readpv': 'RBV'`). This field name is combined + with the motor PV prefix for the full field name. The motor PV prefix in turn is derived from the motor and + controller name given in the object constructor and the PV prefix defined in `config.yaml`. + For example, if the `pvprefix` in `config.yaml` is "DRVTESTS", the controller name is "turboPmac1" and the motor + name is "ax1", the full PV name for "readpv" is then: + `DRVTESTS:turboPmac1:ax1.RBV` + + The full PV adress for each alias is created in the function `_get_pvs`. If additional PVs are needed, this + function therefore needs to be overwritten correspondingly in subclasses. + """ + + # This will store PV objects for each PV param and is populated by the init function + _param_to_pv = {} # Motor record fields fields = { @@ -66,16 +92,23 @@ class Motor: pvprefix = config['pvprefix'] self.pv = f'{pvprefix}:{controller}:{name}' self.default_position = default_position + self._epics_wrapper = CaprotoWrapper(TIMEOUT) - def write(self, suffix, value): - write(self.pv + suffix, value, timeout=CAPROTO_TIMEOUT) + for (access_name, pvname) in self._get_pvs().items(): + self._epics_wrapper.connect_pv(pvname) + self._param_to_pv[access_name] = pvname - def write_field(self, fieldname, value): - self.write('.' + self.fields[fieldname], value) + def _get_pvs(self, pvs ={}): + """ + :return: Dict of PV aliases and actual PV names. + """ + for (alias, pvname) in self.fields.items(): + pvs[alias] = '.'.join((self.pv, pvname)) + return pvs - def read(self, suffix, as_string=False): - response = read(self.pv + suffix, timeout=CAPROTO_TIMEOUT) - return self._convert_value(response, as_string) + def put_pv(self, pvparam, value, wait=False): + self._epics_wrapper.put_pv_value(self._param_to_pv[pvparam], value, + wait=wait) def _convert_value(self, response, as_string=False): @@ -101,8 +134,9 @@ class Motor: encoding='utf-8', errors='ignore') return response.data - def read_field(self, fieldname, as_string=False): - return self.read('.' + self.fields[fieldname], as_string) + def get_pv(self, pvparam, as_string=False): + return self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam], + as_string=as_string) def move_and_wait(self, target): """ @@ -116,16 +150,16 @@ class Motor: return # Is the motor already moving? - already_moving = self.read_field('donemoving') == 0 + already_moving = self.get_pv('donemoving') == 0 - self.write_field('writepv', target) + self.put_pv('writepv', target) if not already_moving: self.wait_for_start() self.wait_for_done() def limits(self): - return (self.read_field('lowlimit'), self.read_field('highlimit')) + return (self.get_pv('lowlimit'), self.get_pv('highlimit')) def at_target(self, target=None): """ @@ -134,18 +168,18 @@ class Motor: into account """ if target is None: - target = self.read_field('writepv') + target = self.get_pv('writepv') - return (self.read_field('miss') == 0 and - math.isclose(self.read_field('readpv'), target, - rel_tol=1e-9, abs_tol=self.read_field('precision'))) + return (self.get_pv('miss') == 0 and + math.isclose(self.get_pv('readpv'), target, + rel_tol=1e-9, abs_tol=self.get_pv('precision'))) def wait_for_start(self, maxwaittime=None): if maxwaittime is None: maxwaittime = 5 * self.idlepoll now = time.time() - while self.read_field('donemoving') != 0: + while self.get_pv('donemoving') != 0: if now + maxwaittime < time.time(): raise StartTimeoutError( f'Motor did not start in {maxwaittime} seconds') @@ -158,7 +192,7 @@ class Motor: time.sleep(0.1) def wait_for_done(self): - while not self.read_field('donemoving'): + while not self.get_pv('donemoving'): if self.has_error(): error_msg = self.error_message() if error_msg: @@ -168,14 +202,14 @@ class Motor: time.sleep(0.1) def has_error(self): - return self.read_field('alarm_severity') == 'MAJOR' + return self.get_pv('alarm_severity') == 'MAJOR' def error_message(self): return '' def is_homed(self): # See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status - str = format(self.read_field('status'), '016b') + str = format(int(self.get_pv('status')), '016b') return bool(str[15]) @@ -190,27 +224,28 @@ class SinqMotor(Motor): 'can_set_speed': ':CanSetSpeed', 'reseterrorpv': ':Reset', 'errormsgpv': '-MsgTxt', - } + } + + def _get_pvs(self, pvs ={} ): + """ + :return: Dict of PV aliases and actual PV names. + """ + + # Add the PVs of the base class first, because we overwrite the "enable" alias + pvs = Motor._get_pvs(self, pvs) + for (alias, pvname) in self.suffixes.items(): + pvs[alias] = self.pv + pvname + return pvs def error_message(self): - return self.read_field('errormsgpv', as_string=True) - - def write_field(self, fieldname, value): - if fieldname in self.suffixes: - return self.write(self.suffixes[fieldname], value) - return self.write('.' + self.fields[fieldname], value) - - def read_field(self, fieldname, as_string=False): - if fieldname in self.suffixes: - return self.read(self.suffixes[fieldname], as_string) - return self.read('.' + self.fields[fieldname], as_string) + return self.get_pv('errormsgpv', as_string=True) def enable_and_wait(self, timeout=10): - self.write_field('enable', 1) + self.put_pv('enable', 1) self.wait_enabled(timeout) def disable_and_wait(self, timeout=10): - self.write_field('enable', 0) + self.put_pv('enable', 0) self.wait_disabled(timeout) def wait_enabled(self, timeout=10): @@ -233,7 +268,7 @@ class SinqMotor(Motor): return enable_rbv != 0 now = time.time() - while check(self.read_field('enable_rbv')): + while check(self.get_pv('enable_rbv')): if time.time() > now + timeout: if wait_for_enabling: pytest.fail( diff --git a/tests/conftest.py b/tests/conftest.py index c3bbf8b..412f456 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,12 @@ import time +from pathlib import Path -from caproto.sync.client import read, write +from caproto.sync.client import read import pytest -from test_helpers import read_config +from setup.classes import read_config, IocNotRunning -TIMEOUT_READ = 2 - -@pytest.fixture(autouse=True) def check_ioc_running(): - config = read_config() pvprefix = config['pvprefix'] @@ -17,14 +14,19 @@ def check_ioc_running(): This function checks if the test IOC is already running. """ try: - read(f'{pvprefix}:IOCREADY', timeout=TIMEOUT_READ) + read(f'{pvprefix}:IOCREADY', timeout=2) # Reading the check recird was successfull -> We assume that the IOC # is running return except TimeoutError: # IOC startup failed in the given time -> Raise an error - raise Exception('Start the test IOC first ()') + path = Path.cwd() / 'startioc' + raise IocNotRunning(f'Start the test IOC first by running {path}') + +@pytest.fixture(autouse=True) +def check_ioc_running_before_test(): + check_ioc_running() def pytest_addoption(parser): """ diff --git a/tests/home.py b/tests/home.py new file mode 100755 index 0000000..252255c --- /dev/null +++ b/tests/home.py @@ -0,0 +1,92 @@ +import time +import threading + + +def home(motor, forward): + + encoder = motor.get_pv('encoder_type', as_string=True) + if encoder == 'absolute': + is_absolute = True + elif encoder == 'incremental': + is_absolute = False + else: + raise ValueError(f'Unknown encoder type {encoder}') + + # Start a homing run and observe the motor behaviour depending on the + # encoder type + if forward: + motor.put_pv('homeforward', 1) + else: + motor.put_pv('homereverse', 1) + + motor.wait_for_start() + + if is_absolute: + # EPICS initially assumes the motor is moving and sets it into moving state. + # However, after the first poll it should realize that the motor is in fact not moving. + time.sleep(2 * motor.busypoll) + + # Motor should not move at all + assert motor.get_pv('moving') == 0 + assert motor.get_pv('donemoving') == 1 + assert not motor.has_error() + else: + # The controller might take a bit of time to actually start the movement, + # therefore we wait a bit. + time.sleep(2 * motor.busypoll) + + # Motor should start movement + assert motor.get_pv('moving') == 1 + assert motor.get_pv('donemoving') == 0 + assert not motor.has_error() + # TODO: This should work + # assert not motor.is_homed() + + motor.wait_for_done() + + assert motor.get_pv('moving') == 0 + assert motor.get_pv('donemoving') == 1 + assert not motor.has_error() + assert motor.is_homed() + +def start_home_while_moving(motor, target, forward): + """ + Start a movement to target, wait two seconds and then home the motor. + """ + motor.put_pv('writepv', target) + time.sleep(2) + home(motor, forward) + assert not motor.at_target(target) + +def start_move_while_homing(motor, target, forward): + """ + Try to start a movement while homing (this should fail and the motor should continue homing instead) + """ + def try_start_move(): + time.sleep(5) + motor.put_pv('writepv', target) + + thread = threading.Thread(target = try_start_move) + thread.start() + home(motor, forward) + assert not motor.at_target(target) + + +def move_then_home(motor, target, forward): + """ + Complete a movement and then home the motor + """ + motor.move_and_wait(target) + assert motor.at_target(target) + assert not motor.has_error() + home(motor, forward) + +def home_then_move(motor, target, forward): + """ + Home the motor and then move to the target + """ + home(motor, forward) + motor.move_and_wait(target) + assert motor.at_target(target) + assert not motor.has_error() + diff --git a/tests/move.py b/tests/move.py index 761fada..da84050 100755 --- a/tests/move.py +++ b/tests/move.py @@ -2,7 +2,7 @@ import time import pytest -from test_helpers import StartTimeoutError +from setup.classes import StartTimeoutError def move_to_low_limit_switch(motor): @@ -10,7 +10,7 @@ def move_to_low_limit_switch(motor): motor.move_and_wait(low_limit) assert motor.at_target() # Field is currently not set in our drivers - #assert motor.read_field('lowlimitswitch') == 1 + #assert motor.get_pv('lowlimitswitch') == 1 assert not motor.has_error() @@ -20,7 +20,7 @@ def move_to_high_limit_switch(motor): motor.move_and_wait(low_limit) assert motor.at_target() # Field is currently not set in our drivers - #assert motor.read_field('highlimitswitch') == 1 + #assert motor.get_pv('highlimitswitch') == 1 assert not motor.has_error() @@ -31,14 +31,14 @@ def move_while_move(motor, first_target, second_target, time_first_target): sufficiently large distance between the current motor position and first_target so it has enough time to issue the second move command. """ - motor.write_field('writepv', first_target) + motor.put_pv('writepv', first_target) time.sleep(time_first_target) motor.move_and_wait(second_target) assert motor.at_target(second_target) assert not motor.has_error() def stop_then_move(motor, target): - motor.write_field('stop', 1) + motor.put_pv('stop', 1) motor.wait_for_done() motor.move_and_wait(target) @@ -48,16 +48,16 @@ def stop(motor, target): Stop an motor while it is moving to a target. This function assumes a sufficiently large distance between the current motor position and target. """ - motor.write_field('writepv', target) + motor.put_pv('writepv', target) time.sleep(1) - motor.write_field('stop', 1) + motor.put_pv('stop', 1) motor.wait_for_done() assert not motor.at_target(target) assert not motor.has_error() def invalid_target(motor, target): - current_pos = motor.read_field('readpv') + current_pos = motor.get_pv('readpv') try: motor.move_and_wait(target) pytest.fail('move_and_wait should throw a StartTimeoutError error') @@ -69,5 +69,5 @@ def targets_outside_limits(motor): """ Try to move the motor outside its limits """ - invalid_target(motor, motor.read_field('lowlimit') - 10) - invalid_target(motor, motor.read_field('highlimit') + 10) + invalid_target(motor, motor.get_pv('lowlimit') - 10) + invalid_target(motor, motor.get_pv('highlimit') + 10) diff --git a/tests/sinqMotor/home.py b/tests/sinqMotor/home.py deleted file mode 100755 index 3a3d1d7..0000000 --- a/tests/sinqMotor/home.py +++ /dev/null @@ -1,41 +0,0 @@ -import time - - -def home(motor, forward): - - encoder = motor.read_field('encoder_type') - if encoder == 'absolute': - is_absolute = True - elif encoder == 'incremental': - is_absolute = False - else: - raise ValueError(f'Unknown encoder type {encoder}') - - # Start a homing run and observe the motor behaviour depending on the - # encoder type - if forward: - motor.write_field('homeforward') - else: - motor.write_field('homereverse') - - # Give the record some time to react - time.sleep(0.5) - - if is_absolute: - # Motor should not move at all - assert motor.read_field('moving') == 0 - assert motor.read_field('donemoving') == 1 - assert not motor.has_error() - else: - # Motor should start movement - assert motor.read_field('moving') == 1 - assert motor.read_field('donemoving') == 0 - assert not motor.has_error() - assert not motor.is_homed() - - motor.wait_for_done() - - assert motor.read_field('moving') == 0 - assert motor.read_field('donemoving') == 1 - assert not motor.has_error() - assert motor.is_homed() diff --git a/tests/sinqMotor/limits.py b/tests/sinqMotor/limits.py index e36cc26..0accad5 100755 --- a/tests/sinqMotor/limits.py +++ b/tests/sinqMotor/limits.py @@ -1,7 +1,7 @@ import time import math -from test_helpers import read_config +from setup.classes import read_config def reread_limits_from_hw(motor): @@ -12,18 +12,18 @@ def reread_limits_from_hw(motor): """ (high_limit, low_limit) = motor.limits() - motor.write_field('dialhighlimit', high_limit+10) - motor.write_field('diallowlimit', low_limit-10) + motor.put_pv('dialhighlimit', high_limit+10) + motor.put_pv('diallowlimit', low_limit-10) # Wait two idle poll periods time.sleep(2 * motor.idlepoll) # Values should have been reread - assert math.isclose(motor.read_field('highlimit'), + assert math.isclose(motor.get_pv('highlimit'), high_limit, rel_tol=1e-9, abs_tol=0.001) - assert math.isclose(motor.read_field('lowlimit'), + assert math.isclose(motor.get_pv('lowlimit'), low_limit, rel_tol=1e-9, abs_tol=0.001) - assert math.isclose(motor.read_field('dialhighlimit'), + assert math.isclose(motor.get_pv('dialhighlimit'), high_limit, rel_tol=1e-9, abs_tol=0.001) - assert math.isclose(motor.read_field('diallowlimit'), + assert math.isclose(motor.get_pv('diallowlimit'), low_limit, rel_tol=1e-9, abs_tol=0.001) diff --git a/tests/sinqMotor/masterMacs/ax1/conftest.py b/tests/sinqMotor/masterMacs/ax1/conftest.py index a7f000b..3516536 100755 --- a/tests/sinqMotor/masterMacs/ax1/conftest.py +++ b/tests/sinqMotor/masterMacs/ax1/conftest.py @@ -1,7 +1,7 @@ # This module defines fixtures which are shared for all tests of motor "lin1". import pytest -from test_helpers import MasterMACS +from setup.classes import MasterMACS @pytest.fixture(autouse=True) diff --git a/tests/sinqMotor/masterMacs/ax1/test_common.py b/tests/sinqMotor/masterMacs/ax1/test_common.py index 72ef491..05fd885 100755 --- a/tests/sinqMotor/masterMacs/ax1/test_common.py +++ b/tests/sinqMotor/masterMacs/ax1/test_common.py @@ -2,6 +2,7 @@ from tests.move import * from tests.sinqMotor.limits import * +from tests.home import * from tests.sinqMotor.masterMacs.reset import reset @@ -29,5 +30,21 @@ def test_stop(motor): reset(motor, 1) stop(motor, 18) + +def test_home(motor): + reset(motor, 1) + home(motor, True) + + +def test_start_home_while_moving(motor): + reset(motor, 1) + start_home_while_moving(motor, 10, True) + + +def test_start_home_while_moving(motor): + reset(motor, 1) + start_home_while_moving(motor, 10, True) + + # def test_reread_limits_from_hw(motor): # reread_limits_from_hw(motor) diff --git a/tests/sinqMotor/masterMacs/reset.py b/tests/sinqMotor/masterMacs/reset.py index 1215299..d50d52f 100755 --- a/tests/sinqMotor/masterMacs/reset.py +++ b/tests/sinqMotor/masterMacs/reset.py @@ -7,9 +7,9 @@ def reset(motor, target): 4) Moving to zero """ - motor.write_field('stop', 1) - motor.write_field('reseterrorpv', 1) - motor.write_field('enable', 1) + motor.put_pv('stop', 1) + motor.put_pv('reseterrorpv', 1) + motor.put_pv('enable', 1) motor.move_and_wait(target) assert motor.at_target(target) assert not motor.has_error() diff --git a/tests/sinqMotor/speed.py b/tests/sinqMotor/speed.py index aae2041..2b29e26 100644 --- a/tests/sinqMotor/speed.py +++ b/tests/sinqMotor/speed.py @@ -3,54 +3,54 @@ def speed_fields_valid(motor): Check if the motor speed fields are valid: VMAX >= VELO >= VBAS """ - vbas = motor.read_field('basespeed') - velo = motor.read_field('speed') - vmax = motor.read_field('maxspeed') + vbas = motor.get_pv('basespeed') + velo = motor.get_pv('speed') + vmax = motor.get_pv('maxspeed') assert vmax >= velo >= vbas assert vbas >= 0 assert velo >= 0 assert vmax >= 0 # If the motor is not a speed variable drive, VMAX == VELO == VBAS should hold true - if motor.read_field('can_set_speed') == 0: + if motor.get_pv('can_set_speed') == 0: assert vmax == velo == vbas def set_speed(motor, new_speed): - old_speed = motor.read_field('speed') + old_speed = motor.get_pv('speed') try: - motor.write_field('speed', new_speed) - if motor.read_field('can_set_speed') != 0: - if new_speed > motor.read_field('maxspeed'): - assert motor.read_field('speed') == motor.read_field('maxspeed') - elif new_speed < motor.read_field('basespeed'): - assert motor.read_field('speed') == motor.read_field('basespeed') + motor.put_pv('speed', new_speed) + if motor.get_pv('can_set_speed') != 0: + if new_speed > motor.get_pv('maxspeed'): + assert motor.get_pv('speed') == motor.get_pv('maxspeed') + elif new_speed < motor.get_pv('basespeed'): + assert motor.get_pv('speed') == motor.get_pv('basespeed') else: - assert motor.read_field('speed') == new_speed + assert motor.get_pv('speed') == new_speed else: - assert motor.read_field('speed') == old_speed + assert motor.get_pv('speed') == old_speed finally: - motor.write_field('speed', old_speed) + motor.put_pv('speed', old_speed) def set_minspeed_maxspeed_meanspeed(motor): - set_speed(motor, motor.read_field('maxspeed')) - set_speed(motor, motor.read_field('basespeed')) - set_speed(motor, 0.5 * (motor.read_field('basespeed') + motor.read_field('maxspeed'))) + set_speed(motor, motor.get_pv('maxspeed')) + set_speed(motor, motor.get_pv('basespeed')) + set_speed(motor, 0.5 * (motor.get_pv('basespeed') + motor.get_pv('maxspeed'))) def set_invalid_speed_above_min_below_max(motor): - set_speed(motor, motor.read_field('maxspeed') + 1) - set_speed(motor, motor.read_field('basespeed') - 1) + set_speed(motor, motor.get_pv('maxspeed') + 1) + set_speed(motor, motor.get_pv('basespeed') - 1) def set_speed_and_move(motor, new_speed, target): - old_speed = motor.read_field('speed') + old_speed = motor.get_pv('speed') try: - motor.write_field('speed', new_speed) + motor.put_pv('speed', new_speed) motor.move_and_wait(target) assert motor.at_target(target) finally: - motor.write_field('speed', old_speed) + motor.put_pv('speed', old_speed) diff --git a/tests/sinqMotor/turboPmac/ax1/conftest.py b/tests/sinqMotor/turboPmac/ax1/conftest.py index c0d400f..0e68945 100755 --- a/tests/sinqMotor/turboPmac/ax1/conftest.py +++ b/tests/sinqMotor/turboPmac/ax1/conftest.py @@ -2,17 +2,17 @@ import time import pytest -from test_helpers import TurboPMAC +from setup.classes import TurboPMAC # Prepare the motor at the start of the test by resetting and homing it. @pytest.fixture(scope="session", autouse=True) def reset_and_home(): mot = TurboPMAC('turboPmac1', 'ax1') - mot.write_field('stop', 1) - mot.write_field('reseterrorpv', 1) + mot.put_pv('stop', 1) + mot.put_pv('reseterrorpv', 1) mot.wait_disabled() mot.enable_and_wait() - mot.write_field('homeforward', 1) + mot.put_pv('homeforward', 1) time.sleep(1) mot.wait_for_done() diff --git a/tests/sinqMotor/turboPmac/ax1/test_common.py b/tests/sinqMotor/turboPmac/ax1/test_common.py index 6729df1..609c104 100755 --- a/tests/sinqMotor/turboPmac/ax1/test_common.py +++ b/tests/sinqMotor/turboPmac/ax1/test_common.py @@ -3,6 +3,7 @@ import pytest from tests.move import * +from tests.home import * from tests.sinqMotor.limits import * from tests.sinqMotor.speed import * from tests.sinqMotor.turboPmac.reset import reset @@ -71,21 +72,46 @@ def test_set_invalid_speed_above_min_below_max(motor): def test_set_speed_and_move(motor): reset(motor) - set_speed_and_move(motor, motor.read_field('maxspeed'), 10) - set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0) + set_speed_and_move(motor, motor.get_pv('maxspeed'), 10) + set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) @pytest.mark.stresstest def test_set_speed_and_move_stress(motor): reset(motor) for _ in range(50): - set_speed_and_move(motor, motor.read_field('maxspeed'), 10) - set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0) + set_speed_and_move(motor, motor.get_pv('maxspeed'), 10) + set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) def test_targets_outside_limits(motor): reset(motor) targets_outside_limits(motor) + +def test_home(motor): + reset(motor, 1) + home(motor, True) + + +def test_start_home_while_moving(motor): + reset(motor, 1) + start_home_while_moving(motor, 30, True) + + +def test_start_home_while_moving(motor): + reset(motor, 1) + start_home_while_moving(motor, 30, True) + + +def test_move_then_home(motor): + reset(motor, 1) + move_then_home(motor, 30, True) + + +def test_home_then_move(motor): + reset(motor, 1) + home_then_move(motor, 30, True) + # def test_reread_limits_from_hw(motor): # reread_limits_from_hw(motor) diff --git a/tests/sinqMotor/turboPmac/ax5/conftest.py b/tests/sinqMotor/turboPmac/ax5/conftest.py index 9017958..0fe3b83 100755 --- a/tests/sinqMotor/turboPmac/ax5/conftest.py +++ b/tests/sinqMotor/turboPmac/ax5/conftest.py @@ -1,7 +1,7 @@ # This module defines fixtures which are shared for all tests of motor "rot1". import pytest -from test_helpers import TurboPMAC +from setup.classes import TurboPMAC @pytest.fixture(autouse=True) diff --git a/tests/sinqMotor/turboPmac/ax5/test_common.py b/tests/sinqMotor/turboPmac/ax5/test_common.py index 016d2e8..bfe2ba4 100755 --- a/tests/sinqMotor/turboPmac/ax5/test_common.py +++ b/tests/sinqMotor/turboPmac/ax5/test_common.py @@ -4,6 +4,7 @@ import pytest from tests.move import * from tests.sinqMotor.limits import * from tests.sinqMotor.speed import * +from tests.home import * from tests.sinqMotor.turboPmac.reset import reset @@ -70,19 +71,25 @@ def test_set_invalid_speed_above_min_below_max(motor): def test_set_speed_and_move(motor): reset(motor) - set_speed_and_move(motor, motor.read_field('maxspeed'), 10) - set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0) + set_speed_and_move(motor, motor.get_pv('maxspeed'), 10) + set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) @pytest.mark.stresstest def test_set_speed_and_move_stress(motor): reset(motor) for _ in range(50): - set_speed_and_move(motor, motor.read_field('maxspeed'), 10) - set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0) + set_speed_and_move(motor, motor.get_pv('maxspeed'), 10) + set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) def test_targets_outside_limits(motor): reset(motor) targets_outside_limits(motor) + +def test_home(motor): + reset(motor, 1) + home(motor, True) + + # def test_reread_limits_from_hw(motor): # reread_limits_from_hw(motor) diff --git a/tests/sinqMotor/turboPmac/reset.py b/tests/sinqMotor/turboPmac/reset.py index 1dc5fc5..a4cb7e1 100755 --- a/tests/sinqMotor/turboPmac/reset.py +++ b/tests/sinqMotor/turboPmac/reset.py @@ -16,9 +16,9 @@ def reset(motor, target=None): if target is None: target = motor.default_position - motor.write_field('stop', 1) + motor.put_pv('stop', 1) motor.wait_for_done() - motor.write_field('reseterrorpv', 1) + motor.put_pv('reseterrorpv', 1) motor.wait_disabled() motor.enable_and_wait() motor.move_and_wait(target)