First working version of MVP

This commit is contained in:
2025-07-23 13:20:38 +02:00
parent 0c38f8d062
commit 6d7adaa434
21 changed files with 133 additions and 50 deletions

0
README.md Normal file → Executable file
View File

52
common.py Normal file → Executable file
View File

@@ -1,5 +1,6 @@
import time
import os
import math
import pytest
import yaml
@@ -48,6 +49,7 @@ class Motor:
'status': 'MSTA',
'alarm_status': 'STAT',
'alarm_severity': 'SEVR',
'precision': 'MRES',
}
def __init__(self, controller, name):
@@ -56,7 +58,8 @@ class Motor:
self.port = config['controllers'][controller]['port']
self.busypoll = config['controllers'][controller]['busypoll']
self.idlepoll = config['controllers'][controller]['idlepoll']
self.pv = f'{config['pvprefix']}:{controller}:{name}'
pvprefix = config['pvprefix']
self.pv = f'{pvprefix}:{controller}:{name}'
def write(self, suffix, value):
write(self.pv + suffix, value)
@@ -101,10 +104,21 @@ class Motor:
finished moving. The motor status is polled regulary to see if an error
occurred during movement. In case this happens, an error is raised.
"""
self.write(self.fields['writepv'], target)
# Give the record some time to start
time.sleep(1)
# Is the motor already at its target?
if self.at_target(target):
return
# Is the motor already moving?
already_moving = self.read_field('donemoving') == 0
self.write_field('writepv', target)
if already_moving:
# Wait until the motor has stopped
self.wait_for_done()
self.wait_for_start()
self.wait_for_done()
def limits(self):
@@ -118,17 +132,29 @@ class Motor:
"""
if target is None:
target = self.read_field('writepv')
return (self.read_field('miss') == 0 and
abs(self.read_field('readpv') - target) < self.read_field('precision'))
return (self.read_field('miss') == 0 and
math.isclose(self.read_field('readpv'), target,
rel_tol=1e-9, abs_tol=self.read_field('precision')))
def wait_for_start(self, maxwaittime=None):
if maxwaittime is None:
maxwaittime = 2 * self.idlepoll
now = time.time()
while self.read_field('donemoving') != 0:
if now + maxwaittime < time.time():
raise TimeoutError(f'Motor did not start in {maxwaittime} seconds')
time.sleep(0.1)
def wait_for_done(self):
while not self.read_field('donemoving'):
if self.has_error():
raise MajorState('Record is in MAJOR state!')
time.sleep(1)
time.sleep(0.1)
def has_error(self):
self.read_field('alarm_severity') == 2
return self.read_field('alarm_severity') == 'MAJOR'
def is_homed(self):
# See https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_status
@@ -150,13 +176,13 @@ class SinqMotor(Motor):
def write_field(self, fieldname, value):
if fieldname in self.suffixes:
self.write(self.suffixes[fieldname], value)
self.write('.' + self.fields[fieldname], value)
return self.write(self.suffixes[fieldname], value)
return self.write('.' + self.fields[fieldname], value)
def read_field(self, fieldname):
def read_field(self, fieldname, as_string=False):
if fieldname in self.suffixes:
return self.read(self.suffixes[fieldname])
return self.read('.' + self.fields[fieldname])
return self.read(self.suffixes[fieldname], as_string)
return self.read('.' + self.fields[fieldname], as_string)
class TurboPMAC(SinqMotor):

2
config.yaml Normal file → Executable file
View File

@@ -12,4 +12,4 @@ controllers:
ip: "172.28.101.66"
port: 1912
busypoll: 0.05
idlepoll: 1
idlepoll: 1

0
ioc/__init__.py Normal file → Executable file
View File

0
ioc/db/ready.db Normal file → Executable file
View File

View File

@@ -2,6 +2,6 @@ file $(SINQDBPATH)
{
pattern
{ AXIS, M, EGU, DIR, MRES, ENABLEMOVWATCHDOG, LIMITSOFFSET, CANSETSPEED, RDBD }
{ 1, "lin1", mm, Pos, 0.01, 1, 1.0, 1, 2e-3 }
{ 5, "rot1", degree, Pos, 0.001, 1, 1.0, 0, 1e-3 }
{ 1, "lin1", mm, Pos, 0.01, 1, 1.0, 1, 2e-3 }
{ 5, "rot1", degree, Pos, 0.5, 1, 1.0, 0, 1e-3 }
}

0
tests/__init__.py Normal file → Executable file
View File

0
tests/conftest.py Normal file → Executable file
View File

10
tests/move.py Normal file → Executable file
View File

@@ -6,7 +6,9 @@ def move_to_low_limit_switch(motor):
low_limit = motor.limits()[0]
motor.move_and_wait(low_limit)
assert motor.at_target()
assert motor.read_field('lowlimitswitch') == 1
# Field is currently not set in our drivers
#assert motor.read_field('lowlimitswitch') == 1
assert not motor.has_error()
def move_to_high_limit_switch(motor):
@@ -14,7 +16,9 @@ def move_to_high_limit_switch(motor):
low_limit = motor.limits()[1]
motor.move_and_wait(low_limit)
assert motor.at_target()
assert motor.read_field('highlimitswitch') == 1
# Field is currently not set in our drivers
#assert motor.read_field('highlimitswitch') == 1
assert not motor.has_error()
def move_while_move(motor, first_target, second_target):
@@ -28,6 +32,7 @@ def move_while_move(motor, first_target, second_target):
time.sleep(2)
motor.move_and_wait(second_target)
assert motor.at_target()
assert not motor.has_error()
def stop(motor, target):
@@ -40,3 +45,4 @@ def stop(motor, target):
motor.write_field('stop', 1)
motor.wait_for_done()
assert not motor.at_target(target)
assert not motor.has_error()

0
tests/sinqMotor/__init__.py Normal file → Executable file
View File

0
tests/sinqMotor/home.py Normal file → Executable file
View File

0
tests/sinqMotor/limits.py Normal file → Executable file
View File

0
tests/sinqMotor/turboPmac/__init__.py Normal file → Executable file
View File

0
tests/sinqMotor/turboPmac/lin1/__init__.py Normal file → Executable file
View File

View File

@@ -0,0 +1,9 @@
# This module defines fixtures which are shared for all tests of motor "rot1".
import pytest
from common import TurboPMAC, read_config
import time
@pytest.fixture(autouse=True)
def motor():
return TurboPMAC('turboPmac1', 'lin1')

View File

@@ -1,9 +0,0 @@
# Prepare
import pytest
from common import TurboPMAC
@pytest.fixture
def lin1():
return TurboPMAC()

View File

@@ -0,0 +1,25 @@
# Run a selection of common tests
from tests.move import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.turboPmac.reset import reset
def test_move_to_low_limit_switch(motor):
reset(motor, -30)
move_to_low_limit_switch(motor)
def test_move_to_high_limit_switch(motor):
reset(motor, -30)
move_to_high_limit_switch(motor)
def test_move_while_move(motor):
reset(motor, -30)
move_while_move(motor, -60, -20)
def test_stop(motor):
reset(motor, -30)
stop(motor, -60)
#def test_reread_limits_from_hw(motor):
# reread_limits_from_hw(motor)

View File

@@ -0,0 +1,32 @@
import time
def reset(motor, initpos=0):
"""
Reset the motor for the next test. This means the following things:
1) Stopping the motor
2) Resetting all errors
3) Enabling the motor
4) Moving to zero
"""
# motor.write_field('stop', 1)
# Reset any errors
motor.write_field('reseterrorpv', 1)
# The reset may take some time -> Wait a bit
time.sleep(1)
if motor.read_field('enable_rbv') == 0:
motor.write_field('enable', 1)
# Wait until the motor is enabled.
now = time.time()
max_enable_time = 5
while motor.read_field('enable_rbv') == 0:
# Wait a maximum of max_enable_time seconds until enabling
if time.time() > now + max_enable_time:
pytest.fail(f'Motor {self.pv} could not be enabled in {max_enable_time} seconds')
time.sleep(0.1)
motor.move_and_wait(initpos)

0
tests/sinqMotor/turboPmac/rot1/__init__.py Normal file → Executable file
View File

15
tests/sinqMotor/turboPmac/rot1/conftest.py Normal file → Executable file
View File

@@ -2,21 +2,8 @@
import pytest
from common import TurboPMAC, read_config
import time
@pytest.fixture(autouse=True)
def motor():
return TurboPMAC('turboPmac1', 'rot1')
def reset(motor):
"""
Reset the motor for the next test. This means the following things:
1) Resetting all errors
2) Enabling the motor
3) Moving to zero
"""
motor.write_field('stop', 1)
motor.write_field('reseterrorpv', 1)
motor.write_field('enable', 1)
motor.move_and_wait(0)

25
tests/sinqMotor/turboPmac/rot1/test_common.py Normal file → Executable file
View File

@@ -2,17 +2,24 @@
from tests.move import *
from tests.sinqMotor.limits import *
from tests.sinqMotor.turboPmac.rot1.conftest import reset
from tests.sinqMotor.turboPmac.reset import reset
# def test_move_to_low_limit_switch(motor):
# reset(motor)
# move_to_low_limit_switch(motor)
def test_move_to_low_limit_switch(motor):
reset(motor)
move_to_low_limit_switch(motor)
def test_move_to_high_limit_switch(motor):
reset(motor)
move_to_high_limit_switch(motor)
# def test_move_to_high_limit_switch(motor):
# reset(motor)
# move_to_high_limit_switch(motor)
def test_move_while_move(motor):
reset(motor)
move_while_move(motor, 1000, -1000)
def test_reread_limits_from_hw(motor):
reread_limits_from_hw(motor)
def test_stop(motor):
reset(motor)
stop(motor, 3000)
#def test_reread_limits_from_hw(motor):
# reread_limits_from_hw(motor)