Added homing test and switched to NICOS-like caproto backend implementaiton

This commit is contained in:
Stefan Mathis
2025-07-28 16:43:53 +02:00
parent f1eb2b27bb
commit ed03816888
22 changed files with 646 additions and 162 deletions

View File

@@ -34,7 +34,7 @@ identifier of the records created by the motor drivers.
## Starting the IOC
It is recommended to start the IOC via `ioc/startioc.py`:
It is recommended to start the IOC via `ioc/startioc`:
```console
ioc/startioc.py
@@ -81,15 +81,17 @@ pytest -s tests/sinqMotor/turboPmac/lin1/test_common.py -k 'test_something'
Tests which don't run on the same motor can be parallelized in order to both save on runtime
and to test controller and driver performance while handling multiple motors. To do so,
the wrapper script `pytestpar.py` is provided which starts a pytest process for each motor.
the wrapper script `runtests` is provided which starts a pytest process for each motor.
The individual tests for each motor are run sequentially.
`pytestpar.py` accepts any arguments and forwards them to the pytest call.
`runtests` accepts any arguments and forwards them to the pytest call.
If a folder is given as an argument, all tests which are not within this folder are ignored.
- `pytestpar` will run all tests parallelized per motor
- `pytestpar tests/sinqMotor/turboPmac` will run all Turbo PMAC tests parallelized per motor
- `pytestpar tests/sinqMotor/turboPmac/ax1` will only run the tests for motor `ax1`. These tests are run sequentially.
- `runtests` will run all tests parallelized per motor
- `runtests tests/sinqMotor/turboPmac` will run all Turbo PMAC tests parallelized per motor
- `runtests tests/sinqMotor/turboPmac/ax1` will only run the tests for motor `ax1`. These tests are run sequentially.
In addition to the parallelization feature, `runtests` also starts the IOC if it is not running already.
## Running custom scripts
@@ -100,4 +102,12 @@ the virtual environment and then simply executing it:
bash
source testenv/bin/activate
./example_custom_script.py
```
```
## Developer notes
### EPICS integration
The EPICS integration is currently done via caproto. This module is taken from NICOS (https://github.com/mlz-ictrl/nicos/blob/master/nicos/devices/epics/pva/caproto.py)
and has been slightly modified (mainly removal of NICOS-specific decorators and the conversion of the EPICS to the NICOS status). The docstring of the `setup.classes.Motor`
class contains further information.

View File

@@ -6,7 +6,7 @@
import time
from test_helpers import TurboPMAC
from setup.classes import TurboPMAC
motor = TurboPMAC('turboPmac1', 'lin1')
@@ -17,10 +17,10 @@ motor.move_and_wait(-20)
motor.move_and_wait(-30)
# Disable the motor
motor.write_field('enable', 0)
motor.put_pv('enable', 0)
# Wait a bit so the motor has been disabled
time.sleep(2)
# Reenable the motor
motor.write_field('enable', 1)
motor.put_pv('enable', 1)

View File

@@ -10,7 +10,6 @@ import subprocess
import sys
from pathlib import Path
def startioc():
root_dir = Path(__file__).resolve().parent.parent

View File

@@ -9,4 +9,5 @@ source testenv/bin/activate
pip install 'caproto'
pip install 'pytest'
pip install 'pyyaml'
pip install 'pyyaml'
pip install 'numpy'

View File

@@ -1,6 +1,8 @@
#!/usr/bin/env python3
# This script is a wrapper around pytest which starts a separate pytest process
# This script is a wrapper around pytest which performs the following tasks:
# 1) If the IOC in ioc/startioc.py is not running, start it
# 2) Run tests in parallel by starting a separate pytest process
# for each separate motor given in FOLDER. This allows to run tests on different
# motors in parallel, cutting down execution time and testing parallelism.
# The script accepts any arguments and forwards them to the pytest call.
@@ -15,10 +17,17 @@ import sys
import subprocess
import os
from pathlib import Path
import time
from tests.conftest import check_ioc_running
from setup.classes import IocNotRunning
# Define the test folders you want to run in parallel
FOLDERS = ["tests/sinqMotor/turboPmac/ax1", "tests/sinqMotor/turboPmac/ax5"]
# Time we excpect the IOC needs to start up
TIMEOUT_IOC_STARTUP = 10
# Separate path-like arguments from other pytest args
path_args = []
extra_args = []
@@ -38,6 +47,26 @@ else:
if any(Path(arg).resolve().as_posix().startswith(Path(folder).resolve().as_posix()) for arg in path_args):
enabled_folders.append(folder)
# Check if the IOC is running and try to start it, if it isn't
try:
check_ioc_running()
except IocNotRunning:
print('Starting IOC ...')
# Start the IOC as a separate process, wait a bit and try again
path = Path(__file__).resolve().parent / 'ioc' / 'startioc'
p = subprocess.Popen([path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
now = time.time()
while now + TIMEOUT_IOC_STARTUP > time.time():
time.sleep(0.5)
try:
check_ioc_running()
except IocNotRunning:
pass
check_ioc_running()
print(f'Started IOC successfully (process ID {p.pid})! It will automatically terminate after all tests are done.')
# Run each enabled folder's relevant tests in parallel
processes = []
for folder in enabled_folders:
@@ -58,5 +87,4 @@ for proc in processes:
code = proc.wait()
if code != 0:
exit_code = code
sys.exit(exit_code)

0
setup/__init__.py Normal file
View File

308
setup/caproto.py Normal file
View File

@@ -0,0 +1,308 @@
# *****************************************************************************
# NICOS, the Networked Instrument Control System of the MLZ
# Copyright (c) 2009-2025 by the NICOS contributors (see AUTHORS)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Matt Clarke <matt.clarke@ess.eu>
# Stefan Mathis <stefan.mathis@psi.ch>
#
# *****************************************************************************
#
# This file has been taken from NICOS, the original file can be found in:
# https://github.com/mlz-ictrl/nicos/blob/master/nicos/devices/epics/pva/caproto.py
from functools import partial
import numpy as np
from caproto import CaprotoTimeoutError, ChannelType
from caproto.sync.client import read, write
from caproto.threading.client import Context
FTYPE_TO_TYPE = {
ChannelType.STRING: str,
ChannelType.INT: int,
ChannelType.FLOAT: float,
ChannelType.ENUM: int,
ChannelType.CHAR: bytes,
ChannelType.LONG: int,
ChannelType.DOUBLE: float,
ChannelType.TIME_STRING: str,
ChannelType.TIME_INT: int,
ChannelType.TIME_FLOAT: float,
ChannelType.TIME_ENUM: int,
ChannelType.TIME_CHAR: bytes,
ChannelType.TIME_LONG: int,
ChannelType.TIME_DOUBLE: float,
ChannelType.CTRL_STRING: str,
ChannelType.CTRL_INT: int,
ChannelType.CTRL_FLOAT: float,
ChannelType.CTRL_ENUM: int,
ChannelType.CTRL_CHAR: bytes,
ChannelType.CTRL_LONG: int,
ChannelType.CTRL_DOUBLE: float,
}
STATUS_TO_MESSAGE = {
0: 'NO_ALARM',
1: 'READ',
2: 'WRITE',
3: 'HIHI',
4: 'HIGH',
5: 'LOLO',
6: 'LOW',
7: 'STATE',
8: 'COS',
9: 'COMM',
10: 'TIMED',
11: 'HWLIMIT',
12: 'CALC',
13: 'SCAN',
14: 'LINK',
15: 'SOFT',
16: 'BAD_SUB',
17: 'UDF',
18: 'DISABLE',
19: 'SIMM',
20: 'READ_ACCESS',
21: 'WRITE_ACCESS',
}
# Same context can be shared across all devices.
_Context = Context()
def caget(name, timeout=3.0):
""" Returns the PV's current value in its raw form via CA.
:param name: the PV name
:param timeout: the EPICS timeout
:return: the PV's raw value
"""
response = read(name, timeout=timeout)
return response.data[0] if len(response.data) == 1 else response.data
def caput(name, value, wait=False, timeout=3.0):
""" Sets a PV's value via CA.
:param name: the PV name
:param value: the value to set
:param wait: whether to wait for completion
:param timeout: the EPICS timeout
"""
write(name, value, notify=wait, timeout=timeout)
class CaprotoWrapper:
""" Class that wraps the caproto module that provides EPICS Channel Access
(CA) support.
"""
def __init__(self, timeout=3.0):
self._pvs = {}
self._choices = {}
self._callbacks = set()
self._timeout = timeout
def connect_pv(self, pvname):
if pvname in self._pvs:
return
value = self._create_pv(pvname)
# Do some prep work for enum types
if hasattr(value.metadata, 'enum_strings'):
self._choices[pvname] = self.get_value_choices(pvname)
def _create_pv(self, pvname, connection_callback=None):
try:
pv, *_ = _Context.get_pvs(
pvname,
connection_state_callback=connection_callback,
timeout=self._timeout
)
self._pvs[pvname] = pv
# Do a read to force a connection
return pv.read(timeout=self._timeout, data_type='control')
except CaprotoTimeoutError:
raise TimeoutError(
f'could not connect to PV {pvname}') from None
def get_pv_value(self, pvname, as_string=False):
try:
response = self._pvs[pvname].read(timeout=self._timeout,
data_type='control')
return self._convert_value(pvname, response, as_string)
except CaprotoTimeoutError:
raise TimeoutError(f'getting {pvname} timed out') from None
def _convert_value(self, pvname, response, as_string=False):
# By default, the response data is always a list to cover all possible
# readback values from EPICS (lists, strings, chars, numbers). The last
# two cases need to be treated in a special way. They can be identified
# by the list length being 1.
if len(response.data) == 1:
value = response.data[0]
if pvname in self._choices:
return self._choices[pvname][value] if as_string else value
elif isinstance(value, bytes):
return value.decode()
# If an empty string is returned, the data has a single entry
# (the NULL terminator)
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return value
# Strings are read with their NULL terminator, hence it needs to be
# stripped before decoding
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
if isinstance(response.data, np.ndarray):
val_type = FTYPE_TO_TYPE[self._pvs[pvname].channel.native_data_type]
if val_type == bytes or as_string:
return response.data.tobytes().decode()
return response.data
def put_pv_value(self, pvname, value, wait=False):
if pvname in self._choices:
value = self._choices[pvname].index(value)
try:
self._pvs[pvname].write(value, wait=wait, timeout=self._timeout)
except CaprotoTimeoutError:
raise TimeoutError(f'setting {pvname} timed out') from None
def put_pv_value_blocking(self, pvname, value, block_timeout=60):
if pvname in self._choices:
value = self._choices[pvname].index(value)
try:
self._pvs[pvname].write(value, wait=True, timeout=block_timeout)
except CaprotoTimeoutError:
raise TimeoutError(f'setting {pvname} timed out') from None
def get_pv_type(self, pvname):
data_type = self._pvs[pvname].channel.native_data_type
return FTYPE_TO_TYPE.get(data_type)
def get_alarm_status(self, pvname):
values = self.get_control_values(pvname)
return self._extract_alarm_info(values)
def get_units(self, pvname, default=''):
values = self.get_control_values(pvname)
return self._get_units(values, default)
def _get_units(self, values, default):
if hasattr(values, 'units'):
return values.units.decode()
return default
def get_limits(self, pvname, default_low=-1e308, default_high=1e308):
values = self.get_control_values(pvname)
if hasattr(values, 'lower_ctrl_limit'):
default_low = values.lower_ctrl_limit
default_high = values.upper_ctrl_limit
return default_low, default_high
def get_control_values(self, pvname):
try:
result = self._pvs[pvname].read(timeout=self._timeout,
data_type='control')
return result.metadata
except CaprotoTimeoutError:
raise TimeoutError(
f'getting control values for {pvname} timed out') from None
def get_value_choices(self, pvname):
# Only works for enum types like MBBI and MBBO
value = self.get_control_values(pvname)
return self._extract_choices(value)
def _extract_choices(self, value):
if hasattr(value, 'enum_strings'):
return [x.decode() for x in value.enum_strings]
return []
def subscribe(self, pvname, pvparam, change_callback,
connection_callback=None, as_string=False):
"""
Create a monitor subscription to the specified PV.
:param pvname: The PV name.
:param pvparam: The associated NICOS parameter
(e.g. readpv, writepv, etc.).
:param change_callback: The function to call when the value changes.
:param connection_callback: The function to call when the connection
status changes.
:param as_string: Whether to return the value as a string.
:return: the subscription object.
"""
conn_callback = self._create_connection_callback(pvname, pvparam,
connection_callback)
self._create_pv(pvname, conn_callback)
value_callback = self._create_value_callback(pvname, pvparam,
change_callback, as_string)
sub = self._pvs[pvname].subscribe(data_type='control')
sub.add_callback(value_callback)
return sub
def _create_value_callback(self, pvname, pvparam, change_callback,
as_string):
callback = partial(self._callback, pvname, pvparam, change_callback,
as_string)
self._store_callback(callback)
return callback
def _create_connection_callback(self, pvname, pvparam, connection_callback):
callback = partial(self._conn_callback, pvname, pvparam,
connection_callback)
self._store_callback(callback)
return callback
def _store_callback(self, callback):
# Must keep a reference to callbacks to avoid garbage collection!
self._callbacks.add(callback)
def _callback(self, pvname, pvparam, change_callback, as_string, sub,
response):
value = self._convert_value(pvname, response, as_string)
units = self._get_units(response.metadata, '')
severity, message = self._extract_alarm_info(response)
change_callback(pvname, pvparam, value, units, severity, message)
def _conn_callback(self, pvname, pvparam, connection_callback, pv, state):
connection_callback(pvname, pvparam, state == 'connected')
def _extract_alarm_info(self, values):
# The EPICS 'severity' matches to the NICOS `status` and the message has
# a short description of the alarm details.
if hasattr(values, 'severity'):
message = STATUS_TO_MESSAGE[values.status]
return values.severity, '' if message == 'NO_ALARM' else message
return values.severity, 'alarm information unavailable'
def close_subscription(self, subscription):
subscription.clear()

View File

@@ -4,16 +4,13 @@ import math
import pytest
import yaml
from caproto.sync.client import read, write
# We use a very high timeout here in order to make sure the tests do not fail prematurely.
# The default 1 second has proven to be too strict.
CAPROTO_TIMEOUT = 5
from setup.caproto import CaprotoWrapper
TIMEOUT = 5
def read_config():
root_dir = os.path.dirname(__file__)
with open(root_dir + "/config.yaml", "r") as f:
with open(root_dir + "/../config.yaml", "r") as f:
return yaml.safe_load(f)
@@ -23,7 +20,36 @@ class MajorState(Exception):
class StartTimeoutError(TimeoutError):
pass
class IocNotRunning(Exception):
pass
class Motor:
"""
This object represents a motor used for testing.
A few notes regarding EPICS interaction:
Although it would be feasible to simply use the `read` and `write` functions in `caproto.sync.client` to
access the motor PVs directly everytime, this approach is problematic because it opens and closes an UDP
socket everytime. In practice, this is known to be susceptible to timeout errors due to high volumes
of network traffic. Therefore, the NICOS `CaprotoWrapper` opens a socket once for each PV and keeps it
alive. The wrapper object is then attached to the motor via the field `_epics_wrapper` and used
to communicate with EPICS.
The PVs are derived from the member variable `fields`, which specifies a PV alias as a key
and the corresponding motor record field name (e.g. `'readpv': 'RBV'`). This field name is combined
with the motor PV prefix for the full field name. The motor PV prefix in turn is derived from the motor and
controller name given in the object constructor and the PV prefix defined in `config.yaml`.
For example, if the `pvprefix` in `config.yaml` is "DRVTESTS", the controller name is "turboPmac1" and the motor
name is "ax1", the full PV name for "readpv" is then:
`DRVTESTS:turboPmac1:ax1.RBV`
The full PV adress for each alias is created in the function `_get_pvs`. If additional PVs are needed, this
function therefore needs to be overwritten correspondingly in subclasses.
"""
# This will store PV objects for each PV param and is populated by the init function
_param_to_pv = {}
# Motor record fields
fields = {
@@ -66,16 +92,23 @@ class Motor:
pvprefix = config['pvprefix']
self.pv = f'{pvprefix}:{controller}:{name}'
self.default_position = default_position
self._epics_wrapper = CaprotoWrapper(TIMEOUT)
def write(self, suffix, value):
write(self.pv + suffix, value, timeout=CAPROTO_TIMEOUT)
for (access_name, pvname) in self._get_pvs().items():
self._epics_wrapper.connect_pv(pvname)
self._param_to_pv[access_name] = pvname
def write_field(self, fieldname, value):
self.write('.' + self.fields[fieldname], value)
def _get_pvs(self, pvs ={}):
"""
:return: Dict of PV aliases and actual PV names.
"""
for (alias, pvname) in self.fields.items():
pvs[alias] = '.'.join((self.pv, pvname))
return pvs
def read(self, suffix, as_string=False):
response = read(self.pv + suffix, timeout=CAPROTO_TIMEOUT)
return self._convert_value(response, as_string)
def put_pv(self, pvparam, value, wait=False):
self._epics_wrapper.put_pv_value(self._param_to_pv[pvparam], value,
wait=wait)
def _convert_value(self, response, as_string=False):
@@ -101,8 +134,9 @@ class Motor:
encoding='utf-8', errors='ignore')
return response.data
def read_field(self, fieldname, as_string=False):
return self.read('.' + self.fields[fieldname], as_string)
def get_pv(self, pvparam, as_string=False):
return self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam],
as_string=as_string)
def move_and_wait(self, target):
"""
@@ -116,16 +150,16 @@ class Motor:
return
# Is the motor already moving?
already_moving = self.read_field('donemoving') == 0
already_moving = self.get_pv('donemoving') == 0
self.write_field('writepv', target)
self.put_pv('writepv', target)
if not already_moving:
self.wait_for_start()
self.wait_for_done()
def limits(self):
return (self.read_field('lowlimit'), self.read_field('highlimit'))
return (self.get_pv('lowlimit'), self.get_pv('highlimit'))
def at_target(self, target=None):
"""
@@ -134,18 +168,18 @@ class Motor:
into account
"""
if target is None:
target = self.read_field('writepv')
target = self.get_pv('writepv')
return (self.read_field('miss') == 0 and
math.isclose(self.read_field('readpv'), target,
rel_tol=1e-9, abs_tol=self.read_field('precision')))
return (self.get_pv('miss') == 0 and
math.isclose(self.get_pv('readpv'), target,
rel_tol=1e-9, abs_tol=self.get_pv('precision')))
def wait_for_start(self, maxwaittime=None):
if maxwaittime is None:
maxwaittime = 5 * self.idlepoll
now = time.time()
while self.read_field('donemoving') != 0:
while self.get_pv('donemoving') != 0:
if now + maxwaittime < time.time():
raise StartTimeoutError(
f'Motor did not start in {maxwaittime} seconds')
@@ -158,7 +192,7 @@ class Motor:
time.sleep(0.1)
def wait_for_done(self):
while not self.read_field('donemoving'):
while not self.get_pv('donemoving'):
if self.has_error():
error_msg = self.error_message()
if error_msg:
@@ -168,14 +202,14 @@ class Motor:
time.sleep(0.1)
def has_error(self):
return self.read_field('alarm_severity') == 'MAJOR'
return self.get_pv('alarm_severity') == 'MAJOR'
def error_message(self):
return ''
def is_homed(self):
# See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status
str = format(self.read_field('status'), '016b')
str = format(int(self.get_pv('status')), '016b')
return bool(str[15])
@@ -190,27 +224,28 @@ class SinqMotor(Motor):
'can_set_speed': ':CanSetSpeed',
'reseterrorpv': ':Reset',
'errormsgpv': '-MsgTxt',
}
}
def _get_pvs(self, pvs ={} ):
"""
:return: Dict of PV aliases and actual PV names.
"""
# Add the PVs of the base class first, because we overwrite the "enable" alias
pvs = Motor._get_pvs(self, pvs)
for (alias, pvname) in self.suffixes.items():
pvs[alias] = self.pv + pvname
return pvs
def error_message(self):
return self.read_field('errormsgpv', as_string=True)
def write_field(self, fieldname, value):
if fieldname in self.suffixes:
return self.write(self.suffixes[fieldname], value)
return self.write('.' + self.fields[fieldname], value)
def read_field(self, fieldname, as_string=False):
if fieldname in self.suffixes:
return self.read(self.suffixes[fieldname], as_string)
return self.read('.' + self.fields[fieldname], as_string)
return self.get_pv('errormsgpv', as_string=True)
def enable_and_wait(self, timeout=10):
self.write_field('enable', 1)
self.put_pv('enable', 1)
self.wait_enabled(timeout)
def disable_and_wait(self, timeout=10):
self.write_field('enable', 0)
self.put_pv('enable', 0)
self.wait_disabled(timeout)
def wait_enabled(self, timeout=10):
@@ -233,7 +268,7 @@ class SinqMotor(Motor):
return enable_rbv != 0
now = time.time()
while check(self.read_field('enable_rbv')):
while check(self.get_pv('enable_rbv')):
if time.time() > now + timeout:
if wait_for_enabling:
pytest.fail(

View File

@@ -1,15 +1,12 @@
import time
from pathlib import Path
from caproto.sync.client import read, write
from caproto.sync.client import read
import pytest
from test_helpers import read_config
from setup.classes import read_config, IocNotRunning
TIMEOUT_READ = 2
@pytest.fixture(autouse=True)
def check_ioc_running():
config = read_config()
pvprefix = config['pvprefix']
@@ -17,14 +14,19 @@ def check_ioc_running():
This function checks if the test IOC is already running.
"""
try:
read(f'{pvprefix}:IOCREADY', timeout=TIMEOUT_READ)
read(f'{pvprefix}:IOCREADY', timeout=2)
# Reading the check recird was successfull -> We assume that the IOC
# is running
return
except TimeoutError:
# IOC startup failed in the given time -> Raise an error
raise Exception('Start the test IOC first ()')
path = Path.cwd() / 'startioc'
raise IocNotRunning(f'Start the test IOC first by running {path}')
@pytest.fixture(autouse=True)
def check_ioc_running_before_test():
check_ioc_running()
def pytest_addoption(parser):
"""

92
tests/home.py Executable file
View File

@@ -0,0 +1,92 @@
import time
import threading
def home(motor, forward):
encoder = motor.get_pv('encoder_type', as_string=True)
if encoder == 'absolute':
is_absolute = True
elif encoder == 'incremental':
is_absolute = False
else:
raise ValueError(f'Unknown encoder type {encoder}')
# Start a homing run and observe the motor behaviour depending on the
# encoder type
if forward:
motor.put_pv('homeforward', 1)
else:
motor.put_pv('homereverse', 1)
motor.wait_for_start()
if is_absolute:
# EPICS initially assumes the motor is moving and sets it into moving state.
# However, after the first poll it should realize that the motor is in fact not moving.
time.sleep(2 * motor.busypoll)
# Motor should not move at all
assert motor.get_pv('moving') == 0
assert motor.get_pv('donemoving') == 1
assert not motor.has_error()
else:
# The controller might take a bit of time to actually start the movement,
# therefore we wait a bit.
time.sleep(2 * motor.busypoll)
# Motor should start movement
assert motor.get_pv('moving') == 1
assert motor.get_pv('donemoving') == 0
assert not motor.has_error()
# TODO: This should work
# assert not motor.is_homed()
motor.wait_for_done()
assert motor.get_pv('moving') == 0
assert motor.get_pv('donemoving') == 1
assert not motor.has_error()
assert motor.is_homed()
def start_home_while_moving(motor, target, forward):
"""
Start a movement to target, wait two seconds and then home the motor.
"""
motor.put_pv('writepv', target)
time.sleep(2)
home(motor, forward)
assert not motor.at_target(target)
def start_move_while_homing(motor, target, forward):
"""
Try to start a movement while homing (this should fail and the motor should continue homing instead)
"""
def try_start_move():
time.sleep(5)
motor.put_pv('writepv', target)
thread = threading.Thread(target = try_start_move)
thread.start()
home(motor, forward)
assert not motor.at_target(target)
def move_then_home(motor, target, forward):
"""
Complete a movement and then home the motor
"""
motor.move_and_wait(target)
assert motor.at_target(target)
assert not motor.has_error()
home(motor, forward)
def home_then_move(motor, target, forward):
"""
Home the motor and then move to the target
"""
home(motor, forward)
motor.move_and_wait(target)
assert motor.at_target(target)
assert not motor.has_error()

View File

@@ -2,7 +2,7 @@ import time
import pytest
from test_helpers import StartTimeoutError
from setup.classes import StartTimeoutError
def move_to_low_limit_switch(motor):
@@ -10,7 +10,7 @@ def move_to_low_limit_switch(motor):
motor.move_and_wait(low_limit)
assert motor.at_target()
# Field is currently not set in our drivers
#assert motor.read_field('lowlimitswitch') == 1
#assert motor.get_pv('lowlimitswitch') == 1
assert not motor.has_error()
@@ -20,7 +20,7 @@ def move_to_high_limit_switch(motor):
motor.move_and_wait(low_limit)
assert motor.at_target()
# Field is currently not set in our drivers
#assert motor.read_field('highlimitswitch') == 1
#assert motor.get_pv('highlimitswitch') == 1
assert not motor.has_error()
@@ -31,14 +31,14 @@ def move_while_move(motor, first_target, second_target, time_first_target):
sufficiently large distance between the current motor position and
first_target so it has enough time to issue the second move command.
"""
motor.write_field('writepv', first_target)
motor.put_pv('writepv', first_target)
time.sleep(time_first_target)
motor.move_and_wait(second_target)
assert motor.at_target(second_target)
assert not motor.has_error()
def stop_then_move(motor, target):
motor.write_field('stop', 1)
motor.put_pv('stop', 1)
motor.wait_for_done()
motor.move_and_wait(target)
@@ -48,16 +48,16 @@ def stop(motor, target):
Stop an motor while it is moving to a target. This function assumes a
sufficiently large distance between the current motor position and target.
"""
motor.write_field('writepv', target)
motor.put_pv('writepv', target)
time.sleep(1)
motor.write_field('stop', 1)
motor.put_pv('stop', 1)
motor.wait_for_done()
assert not motor.at_target(target)
assert not motor.has_error()
def invalid_target(motor, target):
current_pos = motor.read_field('readpv')
current_pos = motor.get_pv('readpv')
try:
motor.move_and_wait(target)
pytest.fail('move_and_wait should throw a StartTimeoutError error')
@@ -69,5 +69,5 @@ def targets_outside_limits(motor):
"""
Try to move the motor outside its limits
"""
invalid_target(motor, motor.read_field('lowlimit') - 10)
invalid_target(motor, motor.read_field('highlimit') + 10)
invalid_target(motor, motor.get_pv('lowlimit') - 10)
invalid_target(motor, motor.get_pv('highlimit') + 10)

View File

@@ -1,41 +0,0 @@
import time
def home(motor, forward):
encoder = motor.read_field('encoder_type')
if encoder == 'absolute':
is_absolute = True
elif encoder == 'incremental':
is_absolute = False
else:
raise ValueError(f'Unknown encoder type {encoder}')
# Start a homing run and observe the motor behaviour depending on the
# encoder type
if forward:
motor.write_field('homeforward')
else:
motor.write_field('homereverse')
# Give the record some time to react
time.sleep(0.5)
if is_absolute:
# Motor should not move at all
assert motor.read_field('moving') == 0
assert motor.read_field('donemoving') == 1
assert not motor.has_error()
else:
# Motor should start movement
assert motor.read_field('moving') == 1
assert motor.read_field('donemoving') == 0
assert not motor.has_error()
assert not motor.is_homed()
motor.wait_for_done()
assert motor.read_field('moving') == 0
assert motor.read_field('donemoving') == 1
assert not motor.has_error()
assert motor.is_homed()

View File

@@ -1,7 +1,7 @@
import time
import math
from test_helpers import read_config
from setup.classes import read_config
def reread_limits_from_hw(motor):
@@ -12,18 +12,18 @@ def reread_limits_from_hw(motor):
"""
(high_limit, low_limit) = motor.limits()
motor.write_field('dialhighlimit', high_limit+10)
motor.write_field('diallowlimit', low_limit-10)
motor.put_pv('dialhighlimit', high_limit+10)
motor.put_pv('diallowlimit', low_limit-10)
# Wait two idle poll periods
time.sleep(2 * motor.idlepoll)
# Values should have been reread
assert math.isclose(motor.read_field('highlimit'),
assert math.isclose(motor.get_pv('highlimit'),
high_limit, rel_tol=1e-9, abs_tol=0.001)
assert math.isclose(motor.read_field('lowlimit'),
assert math.isclose(motor.get_pv('lowlimit'),
low_limit, rel_tol=1e-9, abs_tol=0.001)
assert math.isclose(motor.read_field('dialhighlimit'),
assert math.isclose(motor.get_pv('dialhighlimit'),
high_limit, rel_tol=1e-9, abs_tol=0.001)
assert math.isclose(motor.read_field('diallowlimit'),
assert math.isclose(motor.get_pv('diallowlimit'),
low_limit, rel_tol=1e-9, abs_tol=0.001)

View File

@@ -1,7 +1,7 @@
# This module defines fixtures which are shared for all tests of motor "lin1".
import pytest
from test_helpers import MasterMACS
from setup.classes import MasterMACS
@pytest.fixture(autouse=True)

View File

@@ -2,6 +2,7 @@
from tests.move import *
from tests.sinqMotor.limits import *
from tests.home import *
from tests.sinqMotor.masterMacs.reset import reset
@@ -29,5 +30,21 @@ def test_stop(motor):
reset(motor, 1)
stop(motor, 18)
def test_home(motor):
reset(motor, 1)
home(motor, True)
def test_start_home_while_moving(motor):
reset(motor, 1)
start_home_while_moving(motor, 10, True)
def test_start_home_while_moving(motor):
reset(motor, 1)
start_home_while_moving(motor, 10, True)
# def test_reread_limits_from_hw(motor):
# reread_limits_from_hw(motor)

View File

@@ -7,9 +7,9 @@ def reset(motor, target):
4) Moving to zero
"""
motor.write_field('stop', 1)
motor.write_field('reseterrorpv', 1)
motor.write_field('enable', 1)
motor.put_pv('stop', 1)
motor.put_pv('reseterrorpv', 1)
motor.put_pv('enable', 1)
motor.move_and_wait(target)
assert motor.at_target(target)
assert not motor.has_error()

View File

@@ -3,54 +3,54 @@ def speed_fields_valid(motor):
Check if the motor speed fields are valid:
VMAX >= VELO >= VBAS
"""
vbas = motor.read_field('basespeed')
velo = motor.read_field('speed')
vmax = motor.read_field('maxspeed')
vbas = motor.get_pv('basespeed')
velo = motor.get_pv('speed')
vmax = motor.get_pv('maxspeed')
assert vmax >= velo >= vbas
assert vbas >= 0
assert velo >= 0
assert vmax >= 0
# If the motor is not a speed variable drive, VMAX == VELO == VBAS should hold true
if motor.read_field('can_set_speed') == 0:
if motor.get_pv('can_set_speed') == 0:
assert vmax == velo == vbas
def set_speed(motor, new_speed):
old_speed = motor.read_field('speed')
old_speed = motor.get_pv('speed')
try:
motor.write_field('speed', new_speed)
if motor.read_field('can_set_speed') != 0:
if new_speed > motor.read_field('maxspeed'):
assert motor.read_field('speed') == motor.read_field('maxspeed')
elif new_speed < motor.read_field('basespeed'):
assert motor.read_field('speed') == motor.read_field('basespeed')
motor.put_pv('speed', new_speed)
if motor.get_pv('can_set_speed') != 0:
if new_speed > motor.get_pv('maxspeed'):
assert motor.get_pv('speed') == motor.get_pv('maxspeed')
elif new_speed < motor.get_pv('basespeed'):
assert motor.get_pv('speed') == motor.get_pv('basespeed')
else:
assert motor.read_field('speed') == new_speed
assert motor.get_pv('speed') == new_speed
else:
assert motor.read_field('speed') == old_speed
assert motor.get_pv('speed') == old_speed
finally:
motor.write_field('speed', old_speed)
motor.put_pv('speed', old_speed)
def set_minspeed_maxspeed_meanspeed(motor):
set_speed(motor, motor.read_field('maxspeed'))
set_speed(motor, motor.read_field('basespeed'))
set_speed(motor, 0.5 * (motor.read_field('basespeed') + motor.read_field('maxspeed')))
set_speed(motor, motor.get_pv('maxspeed'))
set_speed(motor, motor.get_pv('basespeed'))
set_speed(motor, 0.5 * (motor.get_pv('basespeed') + motor.get_pv('maxspeed')))
def set_invalid_speed_above_min_below_max(motor):
set_speed(motor, motor.read_field('maxspeed') + 1)
set_speed(motor, motor.read_field('basespeed') - 1)
set_speed(motor, motor.get_pv('maxspeed') + 1)
set_speed(motor, motor.get_pv('basespeed') - 1)
def set_speed_and_move(motor, new_speed, target):
old_speed = motor.read_field('speed')
old_speed = motor.get_pv('speed')
try:
motor.write_field('speed', new_speed)
motor.put_pv('speed', new_speed)
motor.move_and_wait(target)
assert motor.at_target(target)
finally:
motor.write_field('speed', old_speed)
motor.put_pv('speed', old_speed)

View File

@@ -2,17 +2,17 @@
import time
import pytest
from test_helpers import TurboPMAC
from setup.classes import TurboPMAC
# Prepare the motor at the start of the test by resetting and homing it.
@pytest.fixture(scope="session", autouse=True)
def reset_and_home():
mot = TurboPMAC('turboPmac1', 'ax1')
mot.write_field('stop', 1)
mot.write_field('reseterrorpv', 1)
mot.put_pv('stop', 1)
mot.put_pv('reseterrorpv', 1)
mot.wait_disabled()
mot.enable_and_wait()
mot.write_field('homeforward', 1)
mot.put_pv('homeforward', 1)
time.sleep(1)
mot.wait_for_done()

View File

@@ -3,6 +3,7 @@
import pytest
from tests.move import *
from tests.home import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.speed import *
from tests.sinqMotor.turboPmac.reset import reset
@@ -71,21 +72,46 @@ def test_set_invalid_speed_above_min_below_max(motor):
def test_set_speed_and_move(motor):
reset(motor)
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
set_speed_and_move(motor, motor.get_pv('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
@pytest.mark.stresstest
def test_set_speed_and_move_stress(motor):
reset(motor)
for _ in range(50):
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
set_speed_and_move(motor, motor.get_pv('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
def test_targets_outside_limits(motor):
reset(motor)
targets_outside_limits(motor)
def test_home(motor):
reset(motor, 1)
home(motor, True)
def test_start_home_while_moving(motor):
reset(motor, 1)
start_home_while_moving(motor, 30, True)
def test_start_home_while_moving(motor):
reset(motor, 1)
start_home_while_moving(motor, 30, True)
def test_move_then_home(motor):
reset(motor, 1)
move_then_home(motor, 30, True)
def test_home_then_move(motor):
reset(motor, 1)
home_then_move(motor, 30, True)
# def test_reread_limits_from_hw(motor):
# reread_limits_from_hw(motor)

View File

@@ -1,7 +1,7 @@
# This module defines fixtures which are shared for all tests of motor "rot1".
import pytest
from test_helpers import TurboPMAC
from setup.classes import TurboPMAC
@pytest.fixture(autouse=True)

View File

@@ -4,6 +4,7 @@ import pytest
from tests.move import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.speed import *
from tests.home import *
from tests.sinqMotor.turboPmac.reset import reset
@@ -70,19 +71,25 @@ def test_set_invalid_speed_above_min_below_max(motor):
def test_set_speed_and_move(motor):
reset(motor)
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
set_speed_and_move(motor, motor.get_pv('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
@pytest.mark.stresstest
def test_set_speed_and_move_stress(motor):
reset(motor)
for _ in range(50):
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
set_speed_and_move(motor, motor.get_pv('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
def test_targets_outside_limits(motor):
reset(motor)
targets_outside_limits(motor)
def test_home(motor):
reset(motor, 1)
home(motor, True)
# def test_reread_limits_from_hw(motor):
# reread_limits_from_hw(motor)

View File

@@ -16,9 +16,9 @@ def reset(motor, target=None):
if target is None:
target = motor.default_position
motor.write_field('stop', 1)
motor.put_pv('stop', 1)
motor.wait_for_done()
motor.write_field('reseterrorpv', 1)
motor.put_pv('reseterrorpv', 1)
motor.wait_disabled()
motor.enable_and_wait()
motor.move_and_wait(target)