Added stresstest setup and fixed bug in move_and_wait

This commit is contained in:
Stefan Mathis
2025-07-24 16:59:14 +02:00
parent 1542cc288c
commit d268422fe5
7 changed files with 82 additions and 30 deletions

View File

@@ -5,7 +5,10 @@ import math
import pytest
import yaml
from caproto.sync.client import read, write
from caproto import CaprotoTimeoutError, ChannelType
# We use a very high timeout here in order to make sure the tests do not fail prematurely.
# The default 1 second has proven to be too strict.
CAPROTO_TIMEOUT = 5
def read_config():
@@ -62,13 +65,13 @@ class Motor:
self.pv = f'{pvprefix}:{controller}:{name}'
def write(self, suffix, value):
write(self.pv + suffix, value)
write(self.pv + suffix, value, timeout=CAPROTO_TIMEOUT)
def write_field(self, fieldname, value):
self.write('.' + self.fields[fieldname], value)
def read(self, suffix, as_string=False):
response = read(self.pv + suffix)
response = read(self.pv + suffix, timeout=CAPROTO_TIMEOUT)
return self._convert_value(response, as_string)
def _convert_value(self, response, as_string=False):
@@ -114,11 +117,8 @@ class Motor:
self.write_field('writepv', target)
if already_moving:
# Wait until the motor has stopped
self.wait_for_done()
self.wait_for_start()
if not already_moving:
self.wait_for_start()
self.wait_for_done()
def limits(self):
@@ -143,6 +143,7 @@ class Motor:
now = time.time()
while self.read_field('donemoving') != 0:
print(self.read_field('donemoving'))
if now + maxwaittime < time.time():
raise TimeoutError(
f'Motor did not start in {maxwaittime} seconds')

View File

@@ -7,7 +7,6 @@ from common import read_config
TIMEOUT_READ = 2
@pytest.fixture(autouse=True)
def check_ioc_running():
@@ -26,3 +25,23 @@ def check_ioc_running():
except TimeoutError:
# IOC startup failed in the given time -> Raise an error
raise Exception('Start the test IOC first ()')
def pytest_addoption(parser):
"""
This function adds the --stress flag to Pytest which can be given to
perform long-running stress tests.
"""
parser.addoption(
"--stresstest",
action="store_true",
default=False,
help="Run tests marked as stresstest"
)
def pytest_runtest_setup(item):
"""
This function provides a marker which can be used on test functions to
mark them as stress tests via `@pytest.mark.stresstest`.
"""
if 'stresstest' in item.keywords and not item.config.getoption("--stresstest"):
pytest.skip("Need --stress to run this test")

View File

@@ -21,7 +21,7 @@ def move_to_high_limit_switch(motor):
assert not motor.has_error()
def move_while_move(motor, first_target, second_target):
def move_while_move(motor, first_target, second_target, time_first_target):
"""
Start moving to a position which is sufficiently far away, then interrupt
this move command with another move command. This function assumes a
@@ -29,9 +29,9 @@ def move_while_move(motor, first_target, second_target):
first_target so it has enough time to issue the second move command.
"""
motor.write_field('writepv', first_target)
time.sleep(2)
time.sleep(time_first_target)
motor.move_and_wait(second_target)
assert motor.at_target()
assert motor.at_target(second_target)
assert not motor.has_error()
def stop_then_move(motor, target):

View File

@@ -1,9 +1,20 @@
# This module defines fixtures which are shared for all tests of motor "lin1".
import time
import pytest
from common import TurboPMAC
# Prepare the motor at the start of the test by resetting and homing it.
@pytest.fixture(scope="session", autouse=True)
def reset_and_home():
mot = TurboPMAC('turboPmac1', 'ax1')
mot.write_field('stop', 1)
mot.write_field('reseterrorpv', 1)
mot.write_field('enable', 1)
mot.write_field('homeforward', 1)
time.sleep(1)
mot.wait_for_done()
@pytest.fixture(autouse=True)
def motor():
return TurboPMAC('turboPmac1', 'lin1')
return TurboPMAC('turboPmac1', 'ax1')

View File

@@ -1,32 +1,43 @@
# Run a selection of common tests
import pytest
from tests.move import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.turboPmac.reset import reset
def test_stop_reset_enable_move_sequence(motor):
def test_reset(motor):
reset(motor, 5)
reset(motor, 0)
reset(motor, 2)
@pytest.mark.stresstest
def test_reset_stress(motor):
for _ in range(50):
test_reset(motor)
def test_move_to_low_limit_switch(motor):
reset(motor, -30)
reset(motor, 0)
move_to_low_limit_switch(motor)
def test_move_to_high_limit_switch(motor):
reset(motor, -30)
reset(motor, 0)
move_to_high_limit_switch(motor)
def test_move_while_move(motor):
reset(motor, -30)
move_while_move(motor, -60, -20)
def test_move_while_move_same_direction(motor):
reset(motor, 0)
move_while_move(motor, -60, -20, 2)
def test_move_while_move_opposite_direction(motor):
reset(motor, 0)
move_while_move(motor, 20, -20, 2)
def test_stop(motor):
reset(motor, -30)
reset(motor, 0)
stop(motor, -60)
# def test_reread_limits_from_hw(motor):

View File

@@ -6,4 +6,4 @@ from common import TurboPMAC
@pytest.fixture(autouse=True)
def motor():
return TurboPMAC('turboPmac1', 'rot1')
return TurboPMAC('turboPmac1', 'ax5')

View File

@@ -1,37 +1,47 @@
# Run a selection of common tests
import pytest
from tests.move import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.turboPmac.reset import reset
def test_stop_reset_enable_move_sequence(motor):
def test_reset(motor):
reset(motor, 5)
reset(motor, 0)
@pytest.mark.stresstest
def test_reset_stress(motor):
for _ in range(50):
test_reset(motor)
def test_move_to_low_limit_switch(motor):
reset(motor)
reset(motor, 0)
move_to_low_limit_switch(motor)
def test_move_to_high_limit_switch(motor):
reset(motor)
reset(motor, 0)
move_to_high_limit_switch(motor)
def test_move_while_move(motor):
reset(motor)
move_while_move(motor, 1000, -1000)
def test_move_while_move_same_direction(motor):
reset(motor, 0)
move_while_move(motor, -3000, -1000, 2)
def test_move_while_move_opposite_direction(motor):
reset(motor, 0)
move_while_move(motor, 1000, -1000, 2)
def test_stop(motor):
reset(motor)
reset(motor, 0)
stop(motor, 3000)
def test_stop_then_move(motor):
reset(motor)
reset(motor, 0)
stop_then_move(motor, 0)