Added velocity tests and changed to checked enabling / disabling. Also added test parallelization

This commit is contained in:
Stefan Mathis
2025-07-25 16:40:47 +02:00
parent d268422fe5
commit f1eb2b27bb
16 changed files with 316 additions and 31 deletions

View File

@@ -45,6 +45,8 @@ in turn imports `ioc/config.cmd` to get the current configuration.
## Running the tests
### General
Running tests requires the following three steps:
- Starting the IOC via `ioc/startioc.py` (see [Configuration](#starting-the-ioc))
- Creating (if not done previously) and activating a suitable virtual environment:
@@ -75,6 +77,20 @@ To show it, use the `-s` flag:
pytest -s tests/sinqMotor/turboPmac/lin1/test_common.py -k 'test_something'
```
### Parallelizing tests over motors
Tests which don't run on the same motor can be parallelized in order to both save on runtime
and to test controller and driver performance while handling multiple motors. To do so,
the wrapper script `pytestpar.py` is provided which starts a pytest process for each motor.
The individual tests for each motor are run sequentially.
`pytestpar.py` accepts any arguments and forwards them to the pytest call.
If a folder is given as an argument, all tests which are not within this folder are ignored.
- `pytestpar` will run all tests parallelized per motor
- `pytestpar tests/sinqMotor/turboPmac` will run all Turbo PMAC tests parallelized per motor
- `pytestpar tests/sinqMotor/turboPmac/ax1` will only run the tests for motor `ax1`. These tests are run sequentially.
## Running custom scripts
The test framework can also be used to run custom scripts. The file

View File

@@ -1,7 +1,7 @@
pvprefix: DRVTESTS
versions:
turboPmac: "mathis_s"
masterMacs: "mathis_s"
masterMacs: "1.1"
controllers:
turboPmac1:
ip: "172.28.101.24"

View File

@@ -6,7 +6,7 @@
import time
from common import TurboPMAC
from test_helpers import TurboPMAC
motor = TurboPMAC('turboPmac1', 'lin1')

3
pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
markers =
stresstest: mark test as a stress test that only runs with --run-stresstests

62
pytestpar.py Executable file
View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
# This script is a wrapper around pytest which starts a separate pytest process
# for each separate motor given in FOLDER. This allows to run tests on different
# motors in parallel, cutting down execution time and testing parallelism.
# The script accepts any arguments and forwards them to the pytest call.
# If a folder is given as an argument, all tests which are not within this
# folder are ignored:
# - `pytestpar` will run all tests parallelized per motor
# - `pytestpar tests/sinqMotor/turboPmac` will run all Turbo PMAC tests parallelized per motor
# - `pytestpar tests/sinqMotor/turboPmac/ax1` will only run the tests for motor `ax1`.
# These tests are run sequentially.
import sys
import subprocess
import os
from pathlib import Path
# Define the test folders you want to run in parallel
FOLDERS = ["tests/sinqMotor/turboPmac/ax1", "tests/sinqMotor/turboPmac/ax5"]
# Separate path-like arguments from other pytest args
path_args = []
extra_args = []
for arg in sys.argv[1:]:
if '/' in arg or os.path.exists(arg):
path_args.append(arg)
else:
extra_args.append(arg)
# Filter folders to run based on path_args
enabled_folders = []
if not path_args:
enabled_folders = FOLDERS
else:
for folder in FOLDERS:
if any(Path(arg).resolve().as_posix().startswith(Path(folder).resolve().as_posix()) for arg in path_args):
enabled_folders.append(folder)
# Run each enabled folder's relevant tests in parallel
processes = []
for folder in enabled_folders:
folder_path_args = (
[arg for arg in path_args if Path(arg).resolve().as_posix().startswith(Path(folder).resolve().as_posix())]
if path_args else [folder]
)
command = ["pytest"] + folder_path_args + extra_args
print(f"Running: {' '.join(command)}")
proc = subprocess.Popen(command)
processes.append(proc)
# Wait for all processes and collect exit codes
exit_code = 0
for proc in processes:
code = proc.wait()
if code != 0:
exit_code = code
sys.exit(exit_code)

View File

@@ -20,6 +20,8 @@ def read_config():
class MajorState(Exception):
pass
class StartTimeoutError(TimeoutError):
pass
class Motor:
@@ -55,7 +57,7 @@ class Motor:
'precision': 'MRES',
}
def __init__(self, controller, name):
def __init__(self, controller, name, default_position=0):
config = read_config()
self.ip = config['controllers'][controller]['ip']
self.port = config['controllers'][controller]['port']
@@ -63,6 +65,7 @@ class Motor:
self.idlepoll = config['controllers'][controller]['idlepoll']
pvprefix = config['pvprefix']
self.pv = f'{pvprefix}:{controller}:{name}'
self.default_position = default_position
def write(self, suffix, value):
write(self.pv + suffix, value, timeout=CAPROTO_TIMEOUT)
@@ -139,22 +142,36 @@ class Motor:
def wait_for_start(self, maxwaittime=None):
if maxwaittime is None:
maxwaittime = 2 * self.idlepoll
maxwaittime = 5 * self.idlepoll
now = time.time()
while self.read_field('donemoving') != 0:
print(self.read_field('donemoving'))
if now + maxwaittime < time.time():
raise TimeoutError(
raise StartTimeoutError(
f'Motor did not start in {maxwaittime} seconds')
if self.has_error():
error_msg = self.error_message()
if error_msg:
raise MajorState(f'Error while waiting for start of movement: {error_msg}')
else:
raise MajorState('Unknown error while waiting for start of movement')
time.sleep(0.1)
def wait_for_done(self):
while not self.read_field('donemoving'):
if self.has_error():
error_msg = self.error_message()
if error_msg:
raise MajorState(f'Error while waiting for completion of movement: {error_msg}')
else:
raise MajorState('Unknown error while waiting for completion of movement')
time.sleep(0.1)
def has_error(self):
return self.read_field('alarm_severity') == 'MAJOR'
def error_message(self):
return ''
def is_homed(self):
# See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status
@@ -170,10 +187,14 @@ class SinqMotor(Motor):
'can_disable': ':CanDisable',
'connected_rbv': ':Connected',
'encoder_type': ':EncoderType',
'can_set_speed': ':CanSetSpeed',
'reseterrorpv': ':Reset',
'errormsgpv': '-MsgTxt',
}
def error_message(self):
return self.read_field('errormsgpv', as_string=True)
def write_field(self, fieldname, value):
if fieldname in self.suffixes:
return self.write(self.suffixes[fieldname], value)
@@ -184,19 +205,27 @@ class SinqMotor(Motor):
return self.read(self.suffixes[fieldname], as_string)
return self.read('.' + self.fields[fieldname], as_string)
def enable_and_wait(self, timeout=10):
self.write_field('enable', 1)
self.wait_enabled(timeout)
def disable_and_wait(self, timeout=10):
self.write_field('enable', 0)
self.wait_disabled(timeout)
def wait_enabled(self, timeout=10):
"""
Wait until the motor is enabled or a timeout has been reached.
"""
return self._wait_enabled_disabled(timeout, True)
return self._wait_enabled_disabled(True, timeout)
def wait_disabled(self, timeout=10):
"""
Wait until the motor is enabled or a timeout has been reached.
"""
return self._wait_enabled_disabled(timeout, False)
return self._wait_enabled_disabled(False, timeout)
def _wait_enabled_disabled(self, timeout, wait_for_enabling):
def _wait_enabled_disabled(self, wait_for_enabling, timeout):
def check(enable_rbv):
if wait_for_enabling:

View File

@@ -3,7 +3,7 @@ import time
from caproto.sync.client import read, write
import pytest
from common import read_config
from test_helpers import read_config
TIMEOUT_READ = 2

View File

@@ -1,5 +1,8 @@
import time
import pytest
from test_helpers import StartTimeoutError
def move_to_low_limit_switch(motor):
@@ -36,7 +39,8 @@ def move_while_move(motor, first_target, second_target, time_first_target):
def stop_then_move(motor, target):
motor.write_field('stop', 1)
motor.write_field('writepv', target)
motor.wait_for_done()
motor.move_and_wait(target)
def stop(motor, target):
@@ -50,3 +54,20 @@ def stop(motor, target):
motor.wait_for_done()
assert not motor.at_target(target)
assert not motor.has_error()
def invalid_target(motor, target):
current_pos = motor.read_field('readpv')
try:
motor.move_and_wait(target)
pytest.fail('move_and_wait should throw a StartTimeoutError error')
except StartTimeoutError:
assert motor.at_target(current_pos)
def targets_outside_limits(motor):
"""
Try to move the motor outside its limits
"""
invalid_target(motor, motor.read_field('lowlimit') - 10)
invalid_target(motor, motor.read_field('highlimit') + 10)

View File

@@ -1,7 +1,7 @@
import time
import math
from common import read_config
from test_helpers import read_config
def reread_limits_from_hw(motor):

View File

@@ -1,7 +1,7 @@
# This module defines fixtures which are shared for all tests of motor "lin1".
import pytest
from common import MasterMACS
from test_helpers import MasterMACS
@pytest.fixture(autouse=True)

56
tests/sinqMotor/speed.py Normal file
View File

@@ -0,0 +1,56 @@
def speed_fields_valid(motor):
"""
Check if the motor speed fields are valid:
VMAX >= VELO >= VBAS
"""
vbas = motor.read_field('basespeed')
velo = motor.read_field('speed')
vmax = motor.read_field('maxspeed')
assert vmax >= velo >= vbas
assert vbas >= 0
assert velo >= 0
assert vmax >= 0
# If the motor is not a speed variable drive, VMAX == VELO == VBAS should hold true
if motor.read_field('can_set_speed') == 0:
assert vmax == velo == vbas
def set_speed(motor, new_speed):
old_speed = motor.read_field('speed')
try:
motor.write_field('speed', new_speed)
if motor.read_field('can_set_speed') != 0:
if new_speed > motor.read_field('maxspeed'):
assert motor.read_field('speed') == motor.read_field('maxspeed')
elif new_speed < motor.read_field('basespeed'):
assert motor.read_field('speed') == motor.read_field('basespeed')
else:
assert motor.read_field('speed') == new_speed
else:
assert motor.read_field('speed') == old_speed
finally:
motor.write_field('speed', old_speed)
def set_minspeed_maxspeed_meanspeed(motor):
set_speed(motor, motor.read_field('maxspeed'))
set_speed(motor, motor.read_field('basespeed'))
set_speed(motor, 0.5 * (motor.read_field('basespeed') + motor.read_field('maxspeed')))
def set_invalid_speed_above_min_below_max(motor):
set_speed(motor, motor.read_field('maxspeed') + 1)
set_speed(motor, motor.read_field('basespeed') - 1)
def set_speed_and_move(motor, new_speed, target):
old_speed = motor.read_field('speed')
try:
motor.write_field('speed', new_speed)
motor.move_and_wait(target)
assert motor.at_target(target)
finally:
motor.write_field('speed', old_speed)

View File

@@ -2,7 +2,7 @@
import time
import pytest
from common import TurboPMAC
from test_helpers import TurboPMAC
# Prepare the motor at the start of the test by resetting and homing it.
@pytest.fixture(scope="session", autouse=True)
@@ -10,7 +10,8 @@ def reset_and_home():
mot = TurboPMAC('turboPmac1', 'ax1')
mot.write_field('stop', 1)
mot.write_field('reseterrorpv', 1)
mot.write_field('enable', 1)
mot.wait_disabled()
mot.enable_and_wait()
mot.write_field('homeforward', 1)
time.sleep(1)
mot.wait_for_done()

View File

@@ -4,6 +4,7 @@ import pytest
from tests.move import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.speed import *
from tests.sinqMotor.turboPmac.reset import reset
@@ -17,28 +18,74 @@ def test_reset_stress(motor):
test_reset(motor)
def test_move_to_low_limit_switch(motor):
reset(motor, 0)
reset(motor)
move_to_low_limit_switch(motor)
def test_move_to_high_limit_switch(motor):
reset(motor, 0)
reset(motor)
move_to_high_limit_switch(motor)
def test_move_while_move_same_direction(motor):
reset(motor, 0)
reset(motor)
move_while_move(motor, -60, -20, 2)
def test_move_while_move_opposite_direction(motor):
reset(motor, 0)
reset(motor)
move_while_move(motor, 20, -20, 2)
def test_stop(motor):
reset(motor, 0)
reset(motor)
stop(motor, -60)
def test_stop_then_move(motor):
reset(motor)
stop_then_move(motor, 20)
@pytest.mark.stresstest
def test_stop_then_move_stress(motor):
reset(motor)
for _ in range(50):
stop_then_move(motor, 10)
stop_then_move(motor, 0)
def test_speed_fields_valid(motor):
reset(motor)
speed_fields_valid(motor)
def test_set_minspeed_maxspeed_meanspeed(motor):
reset(motor)
set_minspeed_maxspeed_meanspeed(motor)
def test_set_invalid_speed_above_min_below_max(motor):
reset(motor)
set_invalid_speed_above_min_below_max(motor)
def test_set_speed_and_move(motor):
reset(motor)
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
@pytest.mark.stresstest
def test_set_speed_and_move_stress(motor):
reset(motor)
for _ in range(50):
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
def test_targets_outside_limits(motor):
reset(motor)
targets_outside_limits(motor)
# def test_reread_limits_from_hw(motor):
# reread_limits_from_hw(motor)

View File

@@ -1,7 +1,7 @@
# This module defines fixtures which are shared for all tests of motor "rot1".
import pytest
from common import TurboPMAC
from test_helpers import TurboPMAC
@pytest.fixture(autouse=True)

View File

@@ -3,12 +3,13 @@ import pytest
from tests.move import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.speed import *
from tests.sinqMotor.turboPmac.reset import reset
def test_reset(motor):
reset(motor, 5)
reset(motor, 0)
reset(motor)
@pytest.mark.stresstest
def test_reset_stress(motor):
@@ -16,34 +17,72 @@ def test_reset_stress(motor):
test_reset(motor)
def test_move_to_low_limit_switch(motor):
reset(motor, 0)
reset(motor)
move_to_low_limit_switch(motor)
def test_move_to_high_limit_switch(motor):
reset(motor, 0)
reset(motor)
move_to_high_limit_switch(motor)
def test_move_while_move_same_direction(motor):
reset(motor, 0)
reset(motor)
move_while_move(motor, -3000, -1000, 2)
def test_move_while_move_opposite_direction(motor):
reset(motor, 0)
reset(motor)
move_while_move(motor, 1000, -1000, 2)
def test_stop(motor):
reset(motor, 0)
reset(motor)
stop(motor, 3000)
def test_stop_then_move(motor):
reset(motor, 0)
stop_then_move(motor, 0)
reset(motor)
stop_then_move(motor, 100)
@pytest.mark.stresstest
def test_stop_then_move_stress(motor):
reset(motor)
for _ in range(50):
stop_then_move(motor, 100)
stop_then_move(motor, 0)
def test_speed_fields_valid(motor):
reset(motor)
speed_fields_valid(motor)
def test_set_minspeed_maxspeed_meanspeed(motor):
reset(motor)
set_minspeed_maxspeed_meanspeed(motor)
def test_set_invalid_speed_above_min_below_max(motor):
reset(motor)
set_invalid_speed_above_min_below_max(motor)
def test_set_speed_and_move(motor):
reset(motor)
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
@pytest.mark.stresstest
def test_set_speed_and_move_stress(motor):
reset(motor)
for _ in range(50):
set_speed_and_move(motor, motor.read_field('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.read_field('maxspeed'), 0)
def test_targets_outside_limits(motor):
reset(motor)
targets_outside_limits(motor)
# def test_reread_limits_from_hw(motor):
# reread_limits_from_hw(motor)

View File

@@ -1,4 +1,6 @@
def reset(motor, target):
import time
def reset(motor, target=None):
"""
Reset the motor for the next test. This means the following things:
1) Stopping the motor
@@ -7,9 +9,18 @@ def reset(motor, target):
4) Moving to zero
"""
# Wait a bit before starting each test so the IOC has some time
# to recover from any previous failed tests.
time.sleep(0.2)
if target is None:
target = motor.default_position
motor.write_field('stop', 1)
motor.wait_for_done()
motor.write_field('reseterrorpv', 1)
motor.write_field('enable', 1)
motor.wait_disabled()
motor.enable_and_wait()
motor.move_and_wait(target)
assert motor.at_target(target)
assert not motor.has_error()