First MVP of driver test setup

This commit is contained in:
2025-07-22 13:35:23 +02:00
commit 707207200a
24 changed files with 482 additions and 0 deletions
View File
+134
View File
@@ -0,0 +1,134 @@
import time
import os
import pytest
import yaml
from caproto.sync.client import read, write
def read_config():
root_dir = os.path.dirname(__file__)
with open(root_dir + "config.yaml", "r") as f:
return yaml.safe_load(f)
class MajorState(Exception):
pass
class Axis:
# Motor record fields
fields = {
'readpv': 'RBV',
'writepv': 'VAL',
'stop': 'STOP',
'donemoving': 'DMOV',
'moving': 'MOVN',
'miss': 'MISS',
'homeforward': 'HOMF',
'homereverse': 'HOMR',
'direction': 'DIR',
'speed': 'VELO',
'basespeed': 'VBAS',
'maxspeed': 'VMAX',
'offset': 'OFF',
'dialhighlimit': 'DHLM',
'diallowlimit': 'DLLM',
'highlimit': 'HLM',
'lowlimit': 'LLM',
'softlimit': 'LVIO',
'lowlimitswitch': 'LLS',
'highlimitswitch': 'HLS',
'resolution': 'MRES',
'enable': 'CNEN',
'set': 'SET',
'foff': 'FOFF',
'status': 'MSTA',
'alarm_status': 'STAT',
'alarm_severity': 'SEVR',
}
def __init__(self, ip, port, pv):
self.ip = ip
self.port = port
self.pv = pv
def write(self, suffix, value):
write(self.pv + suffix, value)
def write_field(self, fieldname, value):
self.write('.' + self.fields[fieldname], value)
def read(self, suffix):
return read(self.pv + suffix)
def read_field(self, fieldname):
return self.read('.' + self.fields[fieldname])
def move_and_wait(self, target):
"""
Move the motor to the given target and return, once the motor has
finished moving. The motor status is polled regulary to see if an error
occurred during movement. In case this happens, an error is raised.
"""
self.write(self.fields['writepv'], target)
# Give the record some time to start
time.sleep(1)
self.wait_for_done()
def limits(self):
return (self.read_field('lowlimit'), self.read_field('highlimit'))
def at_target(self, target=None):
"""
Check if the motor arrived at its target by checking the MISS PV and
comparing the writepv value with the readpv value (taking precision)
into account
"""
if target is None:
target = self.read_field('writepv')
return (self.read_field('miss') == 0 and
abs(self.read_field('readpv') - target) < self.read_field('precision'))
def wait_for_done(self):
while not self.read_field('donemoving'):
if self.has_error():
raise MajorState('Record is in MAJOR state!')
time.sleep(1)
def has_error(self):
self.read_field('alarm_severity') == 2
def is_homed(self):
# See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status
str = format(self.read_field('status'), '016b')
return bool(str[15])
class SinqMotor(Axis):
# PV suffixes used in SinqMotor drivers
suffixes = {
'enable': ':Enable',
'enable_rbv': ':EnableRBV',
'can_disable': ':CanDisable',
'connected_rbv': ':Connected',
'encoder_type': ':EncoderType',
'reseterrorpv': ':Reset',
'errormsgpv': '-MsgTxt',
}
def write_field(self, fieldname, value):
if fieldname in self.suffixes:
self.write(self.suffixes[fieldname], value)
self.write('.' + self.fields[fieldname], value)
def read_field(self, fieldname):
if fieldname in self.suffixes:
return self.read(self.suffixes[fieldname])
return self.read('.' + self.fields[fieldname])
class TurboPMAC(Axis):
pass
+11
View File
@@ -0,0 +1,11 @@
pvprefix: DRVTESTS
versions:
turboPmac: 1.3
masterMacs: 1.2
controllers:
turboPmac1:
ip: "172.28.101.24"
port: 1025
masterMacs1:
ip: "172.28.101.66"
port: 1912
View File
+5
View File
@@ -0,0 +1,5 @@
# Record template which is used to check if the IOC is ready
record(ai, "$(PVPREFIX):IOCREADY") {
field(DESC, "Used to check if the test IOC is online")
field(VAL, 1)
}
+12
View File
@@ -0,0 +1,12 @@
# Configuration for the "modern" masterMacs-Axes
epicsEnvSet("NAME","masterMacs1")
epicsEnvSet("ASYN_PORT","p$(NAME)")
drvAsynIPPortConfigure("$(ASYN_PORT)","$(ASYN_PORT)","$(MASTERMACS1_IP):$(MASTERMACS1_PORT)")
masterMacsController("$(NAME)","$(ASYN_PORT)",8,0.05,0.05,0.3);
masterMacsAxis("$(NAME)",1);
# masterMacsAxis("$(NAME)",2);
epicsEnvSet("SINQDBPATH","$(masterMacs_DB)/sinqMotor.db")
dbLoadTemplate("$(IOCDIR)/motors/$(NAME).substitutions", "INSTR=$(PVPREFIX)$(NAME):,CONTROLLER=$(NAME)")
+6
View File
@@ -0,0 +1,6 @@
file $(SINQDBPATH)
{
pattern
{ AXIS, M, EGU, DIR, MRES, ENABLEMOVWATCHDOG, LIMITSOFFSET, CANSETSPEED }
{ 1, "lin1", mm, Pos, 0.001, 1, 1.0, 1 }
}
+44
View File
@@ -0,0 +1,44 @@
# Configuration for the Turbo PMAC motor controller
#
# Important functions:
#
# turboPmacController:
# Creates the controller object and specifies busy poll period, idle poll period and communication timeout.
# A typical call looks like this:
# turboPmacController("$(NAME)","$(ASYN_PORT)",8,0.01,0.01,0.05);
# with
# 8 = Total number of axes
# 0.05 = Busy poll period in seconds
# 1 = Idle poll period in seconds
# 0.05 = Communication timeout in seconds
#
# setMaxSubsequentTimeouts:
# Set the number of subsequent timeouts which may occur before the user is informed in NICOS.
#
# setThresholdComTimeout:
# Set the maximum number of timeouts which may happen in a given timespan before the user is informed in NICOS.
# A typical call looks like this:
# setThresholdComTimeout("$(NAME)", 3600, 60);
# with
# 3600 = Timespan in seconds
# 60 = Maximum number of timeout events which may occur before the user is informed
epicsEnvSet("NAME","turboPmac1")
epicsEnvSet("ASYN_PORT","p$(NAME)")
pmacAsynIPPortConfigure("$(ASYN_PORT)","$(TURBOPMAC1_IP):$(TURBOPMAC1_PORT)")
turboPmacController("$(NAME)","$(ASYN_PORT)",8,0.01,1,1);
turboPmacAxis("$(NAME)",1);
turboPmacAxis("$(NAME)",5);
# Set the number of subsequent timeouts
setMaxSubsequentTimeouts("$(NAME)", 20);
# Configure the timeout frequency watchdog:
setThresholdComTimeout("$(NAME)", 100, 1);
epicsEnvSet("SINQDBPATH","$(turboPmac_DB)/sinqMotor.db")
dbLoadTemplate("$(IOCDIR)/motors/$(NAME).substitutions", "INSTR=$(PVPREFIX)$(NAME):,CONTROLLER=$(NAME)")
epicsEnvSet("SINQDBPATH","$(turboPmac_DB)/turboPmac.db")
dbLoadTemplate("$(IOCDIR)/motors/$(NAME).substitutions", "INSTR=$(PVPREFIX)$(NAME):,CONTROLLER=$(NAME)")
+7
View File
@@ -0,0 +1,7 @@
file $(SINQDBPATH)
{
pattern
{ AXIS, M, EGU, DIR, MRES, ENABLEMOVWATCHDOG, LIMITSOFFSET, CANSETSPEED, RDBD }
{ 1, "lin1", mm, Pos, 0.01, 1, 1.0, 1, 2e-3 }
{ 5, "rot1", degree, Pos, 0.001, 1, 1.0, 0, 1e-3 }
}
Executable
+19
View File
@@ -0,0 +1,19 @@
#!/usr/local/bin/iocsh
# Get the IP config of the motors
< config.cmd
require turboPmac, $(TURBOPMAC_VERSION)
require masterMacs, $(MASTERMACS_VERSION)
################################################################################
# Motors
# Initialize the motors itself
< motors/turboPmac1.cmd
< motors/masterMacs1.cmd
# Create the test record which is used to detect if the IOC is running
dbLoadRecords("$(IOCDIR)/db/ready.db", "P=$(PVPREFIX)")
iocInit()
+40
View File
@@ -0,0 +1,40 @@
# This script is used to start the test IOC and is usually run as part of a
# test startup procedure. It autogenerates an IOC shell script from motors.yaml
# which contains the IP adresses and ports of the motor controllers.
import yaml
import os
import subprocess
def startioc():
root_dir = os.path.dirname(__file__) + '/../'
with open(root_dir + "config.yaml", "r") as f:
config = yaml.safe_load(f)
with open(root_dir + 'ioc/config.cmd', "w") as out:
# General configuration
out.write(
f'epicsEnvSet("PVPREFIX"," {config["pvprefix"]})\n')
out.write(
f'epicsEnvSet("IOCDIR", "{root_dir + "ioc"}" )\n')
# Motor configuration
out.write(
f'epicsEnvSet("TURBOPMAC_VERSION", "{config["versions"]["turboPmac"]}")\n')
out.write(
f'epicsEnvSet("MASTERMACS_VERSION", "{config["versions"]["masterMacs"]}")\n')
out.write(
f'epicsEnvSet("TURBOPMAC1_IP", "{config["controllers"]["turboPmac1"]["ip"]}")\n')
out.write(
f'epicsEnvSet("TURBOPMAC1_PORT", "{config["controllers"]["turboPmac1"]["port"]}")\n')
out.write(
f'epicsEnvSet("MASTERMACS1_IP", "{config["controllers"]["masterMacs1"]["ip"]}")\n')
out.write(
f'epicsEnvSet("MASTERMACS1_PORT", "{config["controllers"]["masterMacs1"]["port"]}")\n')
# Start the IOC itself
subprocess.run("st_cmd")
+12
View File
@@ -0,0 +1,12 @@
#!/bin/bash
# Creates a Python test environment for running the motor driver tests. The
# test environmenet is created in the current working directory. It is
# recommended to create it in the root of the motorDriverTests repository.
/usr/bin/python3.11 -m venv "testenv"
source testenv/bin/activate
pip install 'caproto'
pip install 'pytest'
pip install 'yaml'
View File
+36
View File
@@ -0,0 +1,36 @@
from caproto.sync.client import read, write
from ioc.startioc import startioc
import pytest
TIMEOUT_IOC_STARTUP = 30
TIMEOUT_READ = 3.0
@pytest.fixture(autouse=True)
def prepare_ioc():
"""
This function checks if the test IOC is already running and attempts to
start it, if that is not the case.
"""
try:
read('MOTORDRIVERTESTS:IOCREADY', timeout=TIMEOUT_READ)
# Reading the check recird was successfull -> We assume that the IOC
# is running
return
except TimeoutError:
# Received a timeout error -> Start the IOC
startioc()
# Check every few seconds if the IOC started successfully
now = time.time()
while now + TIMEOUT_IOC_STARTUP < time.time():
try:
read('MOTORDRIVERTESTS:IOCREADY', timeout=TIMEOUT_READ)
return
except TimeoutError:
time.sleep(0.5)
# IOC startup failed in the given time -> Raise an error
raise TimeoutError(
f'Starting the IOC within {TIMEOUT_IOC_STARTUP} seconds failed.')
+42
View File
@@ -0,0 +1,42 @@
import time
def move_to_low_limit_switch(motor):
low_limit = motor.limits()[0]
motor.move_and_wait(low_limit)
assert motor.at_target()
assert motor.read_field('lowlimitswitch') == 1
def move_to_high_limit_switch(motor):
low_limit = motor.limits()[1]
motor.move_and_wait(low_limit)
assert motor.at_target()
assert motor.read_field('highlimitswitch') == 1
def move_while_move(motor, first_target, second_target):
"""
Start moving to a position which is sufficiently far away, then interrupt
this move command with another move command. This function assumes a
sufficiently large distance between the current motor position and
first_target so it has enough time to issue the second move command.
"""
motor.write_field('writepv', first_target)
time.sleep(2)
motor.move_and_wait(second_target)
assert motor.at_target()
def stop(motor, target):
"""
Stop an motor while it is moving to a target. This function assumes a
sufficiently large distance between the current motor position and target.
"""
motor.write_field('writepv', target)
time.sleep(1)
motor.write_field('stop', 1)
motor.wait_for_done()
assert not motor.at_target(target)
View File
+41
View File
@@ -0,0 +1,41 @@
import time
def home(motor, forward):
encoder = motor.read_field('encoder_type')
if encoder == 'absolute':
is_absolute = True
elif encoder == 'incremental':
is_absolute = False
else:
raise ValueError(f'Unknown encoder type {encoder}')
# Start a homing run and observe the motor behaviour depending on the
# encoder type
if forward:
motor.write_field('homeforward')
else:
motor.write_field('homereverse')
# Give the record some time to react
time.sleep(0.5)
if is_absolute:
# Motor should not move at all
assert motor.read_field('moving') == 0
assert motor.read_field('donemoving') == 1
assert not motor.has_error()
else:
# Motor should start movement
assert motor.read_field('moving') == 1
assert motor.read_field('donemoving') == 0
assert not motor.has_error()
assert not motor.is_homed()
motor.wait_for_done()
assert motor.read_field('moving') == 0
assert motor.read_field('donemoving') == 1
assert not motor.has_error()
assert motor.is_homed()
+21
View File
@@ -0,0 +1,21 @@
import time
def reread_limits_from_hw(motor):
"""
sinqAxis motors usually read their limits from the hardware at each poll,
hence any values manually written to DHLM or DLLM should be overwritten
after the next poll at latest
"""
(high_limit, low_limit) = motor.limits()
motor.write_field('dialhighlimit', high_limit-+10)
motor.write_field('diallowlimit', low_limit-10)
# After two seconds, at least one poll has been done
time.sleep(2)
# Values should have been reread
assert motor.read_field('highlimit') == high_limit
assert motor.read_field('lowlimit') == low_limit
assert motor.read_field('dialhighlimit') == high_limit
assert motor.read_field('diallowlimit') == low_limit
+9
View File
@@ -0,0 +1,9 @@
# Prepare
import pytest
from common import TurboPMAC
@pytest.fixture
def lin1():
return TurboPMAC()
+25
View File
@@ -0,0 +1,25 @@
# This module defines fixtures which are shared for all tests of motor "rot1".
import pytest
from common import TurboPMAC, read_config
@pytest.fixture(autouse=True)
def motor():
config = read_config()
return TurboPMAC(config['controllers']['turboPmac1']['ip'],
config['controllers']['turboPmac1']['port'],
config['pvprefix'] + 'turboPmac1:rot1')
def reset(motor):
"""
Reset the motor for the next test. This means the following things:
1) Resetting all errors
2) Enabling the motor
3) Moving to zero
"""
motor.write_field('stop', 1)
motor.write_field('reseterrorpv', 1)
motor.write_field('enable', 1)
motor.move_and_wait(0)
@@ -0,0 +1,18 @@
# Run a selection of common tests
from tests.move import *
from tests.sinqAxis.limits import *
from tests.sinqAxis.turboPmac.rot1.conftest import reset
# def test_move_to_low_limit_switch(motor):
# reset(motor)
# move_to_low_limit_switch(motor)
# def test_move_to_high_limit_switch(motor):
# reset(motor)
# move_to_high_limit_switch(motor)
def test_reread_limits_from_hw(motor):
reread_limits_from_hw(motor)