WIP status of logging

This commit is contained in:
2025-12-05 15:44:46 +01:00
parent 3cad793e78
commit 8e5a5bb8d1
23 changed files with 384 additions and 265 deletions

View File

@@ -67,6 +67,8 @@ in turn imports `ioc/config.cmd` to get the current configuration.
## Running the tests
TODO: Add doc for stresstest and log flags
### General
Running tests requires the following three steps:

View File

@@ -1,26 +0,0 @@
#!/usr/bin/env python3
# This script instructs the "lin1" motor (first axis of the sinqtest Turbo PMAC
# controller) to drive to -20 mm. Once it arrives, it then drives to -30 mm.
# Afterwards, the motor is disabled and then reenabled
import time
from setup.classes import TurboPMAC
motor = TurboPMAC('turboPmac1', 'lin1')
# Drive to position -20 mm
motor.move_and_wait(-20)
# Drive to position -30 mm
motor.move_and_wait(-30)
# Disable the motor
motor.put_pv('enable', 0)
# Wait a bit so the motor has been disabled
time.sleep(2)
# Reenable the motor
motor.put_pv('enable', 1)

View File

@@ -18,17 +18,16 @@
#
# Module authors:
# Matt Clarke <matt.clarke@ess.eu>
# Stefan Mathis <stefan.mathis@psi.ch>
#
# *****************************************************************************
#
# *****************************************************************************
#
# This file has been taken from NICOS, the original file can be found in:
# https://github.com/mlz-ictrl/nicos/blob/master/nicos/devices/epics/pva/caproto.py
from functools import partial
import numpy as np
from caproto import CaprotoTimeoutError, ChannelType
from caproto.sync.client import read, write
from caproto.threading.client import Context
@@ -88,28 +87,6 @@ STATUS_TO_MESSAGE = {
_Context = Context()
def caget(name, timeout=3.0):
""" Returns the PV's current value in its raw form via CA.
:param name: the PV name
:param timeout: the EPICS timeout
:return: the PV's raw value
"""
response = read(name, timeout=timeout)
return response.data[0] if len(response.data) == 1 else response.data
def caput(name, value, wait=False, timeout=3.0):
""" Sets a PV's value via CA.
:param name: the PV name
:param value: the value to set
:param wait: whether to wait for completion
:param timeout: the EPICS timeout
"""
write(name, value, notify=wait, timeout=timeout)
class CaprotoWrapper:
""" Class that wraps the caproto module that provides EPICS Channel Access
(CA) support.
@@ -146,6 +123,12 @@ class CaprotoWrapper:
def get_pv_value(self, pvname, as_string=False):
try:
# response is a ReadResponse objects with the fields:
# - data: a scalar or an array depending on the record type
# - metada: Control metdata such as limits, units, etc.
# - status: Record status
# - severity: Error severity
# - timestamp: Read timestamp
response = self._pvs[pvname].read(timeout=self._timeout,
data_type='control')
return self._convert_value(pvname, response, as_string)
@@ -153,58 +136,67 @@ class CaprotoWrapper:
raise TimeoutError(f'getting {pvname} timed out') from None
def _convert_value(self, pvname, response, as_string=False):
"""
Convert the data returned from reading a PV into its "natural" type.
Caproto always returns the read values wrapped in an array
(a numpy.ndarray) if Numpy is installed, which always holds true in
NICOS. Furthermore, strings are returned as (an array of) raw bytes /
hex values.
Therefore, a variety of conversions are performed to make sure the
output type matches the expectations:
- If the return type is a scalar (array length is 1) and the PV is an
enum (pvname in self._choices), then the string representation from
self._choices is returned if as_string == True (if as_string == False,
the scalar itself is returned).
- If as_string == True, the array is treated as a C-string and converted
into a Python string using utf-8 decoding, stripping any NULL
terminators in the process.
- If the value is a scalar, it is is returned as such. Otherwise, the
array is returned.
"""
scalar = None
# By default, the response data is always a list to cover all possible
# readback values from EPICS (lists, strings, chars, numbers). The last
# two cases need to be treated in a special way. They can be identified
# by the list length being 1.
if len(response.data) == 1:
value = response.data[0]
# Unwrap the scalar
scalar = response.data[0]
# Treat the enum case: If the pvname is an enum, return the enum
# string alias if as_string == True, otherwise return the raw value.
if pvname in self._choices:
return self._choices[pvname][value] if as_string else value
elif isinstance(value, bytes):
return value.decode()
return self._choices[pvname][scalar] if as_string else scalar
# If an empty string is returned, the data has a single entry
# (the NULL terminator)
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return value
# Strings are read with their NULL terminator, hence it needs to be
# stripped before decoding
# Decode the array as a string, if requested
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
return response.data.tobytes().rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
if isinstance(response.data, np.ndarray):
val_type = FTYPE_TO_TYPE[self._pvs[pvname].channel.native_data_type]
if val_type == bytes or as_string:
return response.data.tobytes().decode()
# If the value is a scalar, return it, otherwise return the array.
# Explicit None check is needed here because scalar could be a zero!
if scalar is not None:
return scalar
return response.data
def put_pv_value(self, pvname, value, wait=False):
if pvname in self._choices:
value = self._choices[pvname].index(value)
try:
self._pvs[pvname].write(value, wait=wait, timeout=self._timeout)
except CaprotoTimeoutError:
raise TimeoutError(f'setting {pvname} timed out') from None
def put_pv_value(self, pvname, value, timeout=None):
# As is evident from the source code of pv.write (see
# https://caproto.github.io/caproto/v1.2.0/_modules/caproto/threading/client.html#PV.write),
# the `timeout` doesn't do anything if `wait == False`.
# So we can derive the value of `wait` directly from timeout:
# If `timeout == None` (no timeout being given at the callsite),
# `ẁait = False`, otherwise `ẁait = True`.
wait = timeout is not None
def put_pv_value_blocking(self, pvname, value, block_timeout=60):
if pvname in self._choices:
value = self._choices[pvname].index(value)
try:
self._pvs[pvname].write(value, wait=True, timeout=block_timeout)
self._pvs[pvname].write(value, wait=wait, timeout=timeout)
except CaprotoTimeoutError:
raise TimeoutError(f'setting {pvname} timed out') from None
def get_pv_type(self, pvname):
data_type = self._pvs[pvname].channel.native_data_type
return FTYPE_TO_TYPE.get(data_type)
return FTYPE_TO_TYPE.get(data_type, None)
def get_alarm_status(self, pvname):
values = self.get_control_values(pvname)
@@ -297,12 +289,11 @@ class CaprotoWrapper:
connection_callback(pvname, pvparam, state == 'connected')
def _extract_alarm_info(self, values):
# The EPICS 'severity' matches to the NICOS `status` and the message has
# a short description of the alarm details.
if hasattr(values, 'severity'):
message = STATUS_TO_MESSAGE[values.status]
return values.severity, '' if message == 'NO_ALARM' else message
return values.severity, 'alarm information unavailable'
"""
Dummy function. In the original NICOS file, it is used to map EPICS
status to NICOS status.
"""
0, ""
def close_subscription(self, subscription):
subscription.clear()

View File

@@ -1,34 +1,58 @@
import time
import os
import math
import logging
import sys
import pytest
import yaml
from setup.caproto import CaprotoWrapper
from src.caproto import CaprotoWrapper
TIMEOUT = 5
def read_config():
root_dir = os.path.dirname(__file__)
with open(root_dir + "/../config.yaml", "r") as f:
return yaml.safe_load(f)
def setup_custom_logger(name):
"""
Formatting of the logger. Taken from:
https://stackoverflow.com/questions/28330317/print-timestamp-for-logging-in-python
"""
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
handler = logging.FileHandler('log.txt', mode='w')
handler.setFormatter(formatter)
screen_handler = logging.StreamHandler(stream=sys.stdout)
screen_handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.addHandler(screen_handler)
return logger
class MajorState(Exception):
pass
class StartTimeoutError(TimeoutError):
pass
class IocNotRunning(Exception):
pass
class Motor:
"""
This object represents a motor used for testing.
A few notes regarding EPICS interaction:
Although it would be feasible to simply use the `read` and `write` functions in `caproto.sync.client` to
access the motor PVs directly everytime, this approach is problematic because it opens and closes an UDP
socket everytime. In practice, this is known to be susceptible to timeout errors due to high volumes
@@ -48,8 +72,12 @@ class Motor:
function therefore needs to be overwritten correspondingly in subclasses.
"""
__slots__ = ("_param_to_pv", "fields", "controller", "axis", "ip", "port",
"busypoll", "idlepoll", "pv", "default_position", 'logger',
"_epics_wrapper")
# This will store PV objects for each PV param and is populated by the init function
_param_to_pv = {}
_param_to_pv = {}
# Motor record fields
fields = {
@@ -83,14 +111,16 @@ class Motor:
'precision': 'MRES',
}
def __init__(self, controller, name, default_position=0):
def __init__(self, controller, axis, default_position=0):
self.controller = controller
self.axis = axis
config = read_config()
self.ip = config['controllers'][controller]['ip']
self.port = config['controllers'][controller]['port']
self.busypoll = config['controllers'][controller]['busypoll']
self.idlepoll = config['controllers'][controller]['idlepoll']
pvprefix = config['pvprefix']
self.pv = f'{pvprefix}:{controller}:{name}'
self.pv = f'{pvprefix}:{controller}:{axis}'
self.default_position = default_position
self._epics_wrapper = CaprotoWrapper(TIMEOUT)
@@ -98,7 +128,10 @@ class Motor:
self._epics_wrapper.connect_pv(pvname)
self._param_to_pv[access_name] = pvname
def _get_pvs(self, pvs ={}):
# Set up for all "motor" devices.
self.logger = setup_custom_logger(f"motor.{controller}.{axis}")
def _get_pvs(self, pvs={}):
"""
:return: Dict of PV aliases and actual PV names.
"""
@@ -106,37 +139,33 @@ class Motor:
pvs[alias] = '.'.join((self.pv, pvname))
return pvs
def put_pv(self, pvparam, value, wait=False):
self._epics_wrapper.put_pv_value(self._param_to_pv[pvparam], value,
wait=wait)
def put_pv(self, pvparam, value, timeout=None):
self.logger.debug(
f"writing '{value}' to '{self._param_to_pv[pvparam]}'")
def _convert_value(self, response, as_string=False):
self._epics_wrapper.put_pv_value(
self._param_to_pv[pvparam], value, timeout=timeout)
# By default, the response data is always a list to cover all possible
# readback values from EPICS (lists, strings, chars, numbers). The last
# two cases need to be treated in a special way. They can be identified
# by the list length being 1.
if len(response.data) == 1:
value = response.data[0]
if isinstance(value, bytes):
return value.decode()
# If an empty string is returned, the data has a single entry
# (the NULL terminator)
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return value
# Strings are read with their NULL terminator, hence it needs to be
# stripped before decoding
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return response.data
if timeout is not None:
self.logger.debug(
f"finished blocking writing '{value}' to '{self._param_to_pv[pvparam]}'")
def get_pv(self, pvparam, as_string=False):
return self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam],
as_string=as_string)
self.logger.debug(f"reading '{self._param_to_pv[pvparam]}'")
val = self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam],
as_string=as_string)
self.logger.debug(f"read '{val}' from '{self._param_to_pv[pvparam]}'")
return val
def move(self, target):
# Set the new target
self.put_pv('writepv', target)
if self.at_target(target):
self.logger.info(f"already at target '{target}'")
else:
self.logger.info(f"moving to '{target}'")
def move_and_wait(self, target):
"""
@@ -145,18 +174,20 @@ class Motor:
occurred during movement. In case this happens, an error is raised.
"""
# Is the motor already at its target?
if self.at_target(target):
return
# Is the motor already moving?
already_moving = self.get_pv('donemoving') == 0
# Set the new target
self.put_pv('writepv', target)
if not already_moving:
# Is the motor already moving?
if self.get_pv('donemoving') != 0:
self.logger.info(f"starting move to '{target}'")
self.wait_for_start()
if self.at_target(target):
self.logger.info(f"already at target '{target}'")
else:
self.logger.info(f"moving to '{target}'")
self.wait_for_done()
self.logger.info(
f"finished moving to '{target}', now at '{self.get_pv('readpv')}'")
def limits(self):
return (self.get_pv('lowlimit'), self.get_pv('highlimit'))
@@ -186,9 +217,11 @@ class Motor:
if self.has_error():
error_msg = self.error_message()
if error_msg:
raise MajorState(f'Error while waiting for start of movement: {error_msg}')
raise MajorState(
f'Error while waiting for start of movement: {error_msg}')
else:
raise MajorState('Unknown error while waiting for start of movement')
raise MajorState(
'Unknown error while waiting for start of movement')
time.sleep(0.1)
def wait_for_done(self):
@@ -196,14 +229,16 @@ class Motor:
if self.has_error():
error_msg = self.error_message()
if error_msg:
raise MajorState(f'Error while waiting for completion of movement: {error_msg}')
raise MajorState(
f'Error while waiting for completion of movement: {error_msg}')
else:
raise MajorState('Unknown error while waiting for completion of movement')
raise MajorState(
'Unknown error while waiting for completion of movement')
time.sleep(0.1)
def has_error(self):
return self.get_pv('alarm_severity') == 'MAJOR'
def error_message(self):
return ''
@@ -212,6 +247,30 @@ class Motor:
str = format(int(self.get_pv('status')), '016b')
return bool(str[15])
def stop(self):
self.logger.info(f"stopping")
self.put_pv('stop', 1)
def homeforward(self):
self.logger.info(f"homing forward")
self.put_pv('homeforward', 1)
def homereverse(self):
self.logger.info(f"homing reverse")
self.put_pv('homereverse', 1)
def set_highlimit(self, high_limit):
self.logger.info(f"setting high limit {high_limit}")
self.put_pv('dialhighlimit', high_limit)
def set_lowlimit(self, low_limit):
self.logger.info(f"setting low limit {low_limit}")
self.put_pv('diallowlimit', low_limit)
def set_speed(self, speed):
self.logger.info(f"setting speed {speed}")
self.put_pv('speed', speed)
class SinqMotor(Motor):
# PV suffixes used in SinqMotor drivers
@@ -224,9 +283,9 @@ class SinqMotor(Motor):
'can_set_speed': ':CanSetSpeed',
'reseterrorpv': ':Reset',
'errormsgpv': '-MsgTxt',
}
}
def _get_pvs(self, pvs ={} ):
def _get_pvs(self, pvs={}):
"""
:return: Dict of PV aliases and actual PV names.
"""
@@ -241,13 +300,24 @@ class SinqMotor(Motor):
return self.get_pv('errormsgpv', as_string=True)
def enable_and_wait(self, timeout=20):
self.put_pv('enable', 1)
self.enable(1)
self.wait_enabled(timeout)
def disable_and_wait(self, timeout=20):
self.put_pv('enable', 0)
self.enable(0)
self.wait_disabled(timeout)
def enable(self, val):
if val == 0:
self.logger.info(f"disabling")
else:
self.logger.info(f"enabling")
self.put_pv('enable', 1)
def reset(self):
self.logger.info(f"resetting error")
self.put_pv('reseterrorpv', 1)
def wait_enabled(self, timeout=10):
"""
Wait until the motor is enabled or a timeout has been reached.
@@ -271,13 +341,19 @@ class SinqMotor(Motor):
while check(self.get_pv('enable_rbv')):
if time.time() > now + timeout:
if wait_for_enabling:
pytest.fail(
f'Motor {self.pv} could not be enabled in {timeout} seconds')
raise MajorState(
f'could not be enabled in {timeout} seconds')
else:
pytest.fail(
f'Motor {self.pv} could not be disabled in {timeout} seconds')
raise MajorState(
f'could not be enabled in {timeout} seconds')
time.sleep(0.5)
if wait_for_enabling:
self.logger.info(f"enabled")
else:
self.logger.info(f"disabled")
class TurboPMAC(SinqMotor):
pass

View File

@@ -1,8 +1,10 @@
import time
import threading
from src.classes import Motor
def home(motor, forward):
def home(motor: Motor, forward: bool):
encoder = motor.get_pv('encoder_type', as_string=True)
if encoder == 'absolute':
@@ -17,22 +19,22 @@ def home(motor, forward):
# Start a homing run and observe the motor behaviour depending on the
# encoder type
if forward:
motor.put_pv('homeforward', 1)
motor.homeforward()
else:
motor.put_pv('homereverse', 1)
motor.homereverse()
motor.wait_for_start()
if can_home:
# The controller might take a bit of time to actually start the movement,
# therefore we wait a bit.
# therefore we wait a bit.
time.sleep(2 * motor.busypoll)
# Motor should start movement
assert motor.get_pv('moving') == 1
assert motor.get_pv('donemoving') == 0
assert not motor.has_error()
# TODO: This should work
# TODO: This should work
# assert not motor.is_homed()
motor.wait_for_done()
@@ -51,28 +53,30 @@ def home(motor, forward):
assert motor.get_pv('donemoving') == 1
assert not motor.has_error()
def start_home_while_moving(motor, target, forward):
"""
Start a movement to target, wait two seconds and then home the motor.
"""
motor.put_pv('writepv', target)
motor.move(target)
time.sleep(2)
home(motor, forward)
assert not motor.at_target(target)
def start_move_while_homing(motor, target, forward):
"""
Try to start a movement while homing (this should fail and the motor should continue homing instead)
"""
def try_start_move():
time.sleep(5)
motor.put_pv('writepv', target)
motor.move(target)
thread = threading.Thread(target = try_start_move)
thread = threading.Thread(target=try_start_move)
thread.start()
home(motor, forward)
assert not motor.at_target(target)
def move_then_home(motor, target, forward):
"""
@@ -83,6 +87,7 @@ def move_then_home(motor, target, forward):
assert not motor.has_error()
home(motor, forward)
def home_then_move(motor, target, forward):
"""
Home the motor and then move to the target
@@ -91,4 +96,3 @@ def home_then_move(motor, target, forward):
motor.move_and_wait(target)
assert motor.at_target(target)
assert not motor.has_error()

View File

@@ -2,61 +2,63 @@ import time
import pytest
from setup.classes import StartTimeoutError
from src.classes import Motor, StartTimeoutError
def move_to_low_limit_switch(motor):
def move_to_low_limit_switch(motor: Motor):
low_limit = motor.limits()[0]
motor.move_and_wait(low_limit)
assert motor.at_target()
# Field is currently not set in our drivers
#assert motor.get_pv('lowlimitswitch') == 1
# assert motor.get_pv('lowlimitswitch') == 1
assert not motor.has_error()
def move_to_high_limit_switch(motor):
def move_to_high_limit_switch(motor: Motor):
low_limit = motor.limits()[1]
motor.move_and_wait(low_limit)
assert motor.at_target()
# Field is currently not set in our drivers
#assert motor.get_pv('highlimitswitch') == 1
# assert motor.get_pv('highlimitswitch') == 1
assert not motor.has_error()
def move_while_move(motor, first_target, second_target, time_first_target):
def move_while_move(motor: Motor, first_target, second_target, time_first_target):
"""
Start moving to a position which is sufficiently far away, then interrupt
this move command with another move command. This function assumes a
sufficiently large distance between the current motor position and
first_target so it has enough time to issue the second move command.
"""
motor.put_pv('writepv', first_target)
motor.move(first_target)
time.sleep(time_first_target)
motor.move_and_wait(second_target)
assert motor.at_target(second_target)
assert not motor.has_error()
def stop_then_move(motor, target):
motor.put_pv('stop', 1)
def stop_then_move(motor: Motor, target):
motor.stop('stop', 1)
motor.wait_for_done()
motor.move_and_wait(target)
def stop(motor, target):
def stop(motor: Motor, target):
"""
Stop an motor while it is moving to a target. This function assumes a
sufficiently large distance between the current motor position and target.
"""
motor.put_pv('writepv', target)
motor.move(target)
time.sleep(1)
motor.put_pv('stop', 1)
motor.stop(1)
motor.wait_for_done()
assert not motor.at_target(target)
assert not motor.has_error()
def invalid_target(motor, target):
def invalid_target(motor: Motor, target):
current_pos = motor.get_pv('readpv')
try:
motor.move_and_wait(target)
@@ -65,7 +67,7 @@ def invalid_target(motor, target):
assert motor.at_target(current_pos)
def targets_outside_limits(motor):
def targets_outside_limits(motor: Motor):
"""
Try to move the motor outside its limits
"""

View File

@@ -1,10 +1,10 @@
import time
import math
from setup.classes import read_config
from src.classes import Motor
def reread_limits_from_hw(motor):
def reread_limits_from_hw(motor: Motor):
"""
sinqMotor drivers usually read their limits from the hardware at each poll,
hence any values manually written to DHLM or DLLM should be overwritten
@@ -12,8 +12,8 @@ def reread_limits_from_hw(motor):
"""
(high_limit, low_limit) = motor.limits()
motor.put_pv('dialhighlimit', high_limit+10)
motor.put_pv('diallowlimit', low_limit-10)
motor.set_highlimit(high_limit+10)
motor.set_lowlimit(low_limit-10)
# Wait two idle poll periods
time.sleep(2 * motor.idlepoll)

View File

@@ -1,6 +1,9 @@
import time
def reset(motor, target):
from src.classes import SinqMotor
def reset(motor: SinqMotor, target):
"""
Reset the motor for the next test. This means the following things:
1) Stopping the motor
@@ -16,9 +19,9 @@ def reset(motor, target):
if target is None:
target = motor.default_position
motor.put_pv('stop', 1)
motor.stop(1)
motor.wait_for_done()
motor.put_pv('reseterrorpv', 1)
motor.reset()
motor.wait_disabled()
motor.enable_and_wait()
motor.move_and_wait(target)

View File

@@ -1,4 +1,7 @@
def speed_fields_valid(motor):
from src.classes import Motor
def speed_fields_valid(motor: Motor):
"""
Check if the motor speed fields are valid:
VMAX >= VELO >= VBAS
@@ -16,10 +19,10 @@ def speed_fields_valid(motor):
assert vmax == velo == vbas
def set_speed(motor, new_speed):
def set_speed(motor: Motor, new_speed):
old_speed = motor.get_pv('speed')
try:
motor.put_pv('speed', new_speed)
motor.set_speed(new_speed)
if motor.get_pv('can_set_speed') != 0:
if new_speed > motor.get_pv('maxspeed'):
assert motor.get_pv('speed') == motor.get_pv('maxspeed')
@@ -30,27 +33,25 @@ def set_speed(motor, new_speed):
else:
assert motor.get_pv('speed') == old_speed
finally:
motor.put_pv('speed', old_speed)
motor.set_speed(old_speed)
def set_minspeed_maxspeed_meanspeed(motor):
def set_minspeed_maxspeed_meanspeed(motor: Motor):
set_speed(motor, motor.get_pv('maxspeed'))
set_speed(motor, motor.get_pv('basespeed'))
set_speed(motor, 0.5 * (motor.get_pv('basespeed') + motor.get_pv('maxspeed')))
def set_invalid_speed_above_min_below_max(motor):
def set_invalid_speed_above_min_below_max(motor: Motor):
set_speed(motor, motor.get_pv('maxspeed') + 1)
set_speed(motor, motor.get_pv('basespeed') - 1)
def set_speed_and_move(motor, new_speed, target):
def set_speed_and_move(motor: Motor, new_speed, target):
old_speed = motor.get_pv('speed')
try:
motor.put_pv('speed', new_speed)
motor.set_speed(new_speed)
motor.move_and_wait(target)
assert motor.at_target(target)
finally:
motor.put_pv('speed', old_speed)
motor.set_speed(old_speed)

View File

@@ -1,6 +1,9 @@
import time
def reset(motor, target=None):
from src.classes import SinqMotor
def reset(motor: SinqMotor, target=None):
"""
Reset the motor for the next test. This means the following things:
1) Stopping the motor
@@ -16,9 +19,9 @@ def reset(motor, target=None):
if target is None:
target = motor.default_position
motor.put_pv('stop', 1)
motor.stop()
motor.wait_for_done()
motor.put_pv('reseterrorpv', 1)
motor.reset()
motor.wait_disabled()
motor.enable_and_wait()
motor.move_and_wait(target)

View File

@@ -1,10 +1,11 @@
import time
from pathlib import Path
import logging
from caproto.sync.client import read
import pytest
from setup.classes import read_config, IocNotRunning
from src.classes import read_config, IocNotRunning
def check_ioc_running():
config = read_config()
@@ -24,10 +25,12 @@ def check_ioc_running():
path = Path.cwd() / 'ioc' / 'startioc'
raise IocNotRunning(f'Start the test IOC first by running {path}')
@pytest.fixture(autouse=True)
def check_ioc_running_before_test():
check_ioc_running()
def pytest_addoption(parser):
"""
This function adds the --stress flag to Pytest which can be given to
@@ -39,6 +42,22 @@ def pytest_addoption(parser):
default=False,
help="Run tests marked as stresstest"
)
parser.addoption(
"--log",
action="store",
default="INFO",
help="""
Set log level:
- DEBUG: Show all raw caput and caget commands
- INFO: Write high-level commands (e.g. move, stop, home etc.)
- WARNING: Not used
- ERROR: Serious error (e.g. motor could not be enabled / disabled)
- CRITICAL: Not used
All error levels higher than the defined one are forwarded to the Pytest
logger.
"""
)
def pytest_runtest_setup(item):
"""
@@ -46,4 +65,31 @@ def pytest_runtest_setup(item):
mark them as stress tests via `@pytest.mark.stresstest`.
"""
if 'stresstest' in item.keywords and not item.config.getoption("--stresstest"):
pytest.skip("Need --stress to run this test")
pytest.skip("Need --stress to run this test")
def parse_loglevel(value: str) -> int:
value = value.upper()
if value.isdigit():
return int(value)
if value in logging._nameToLevel: # DEBUG, INFO, …
return logging._nameToLevel[value]
raise ValueError(f"Unknown log level: {value}")
@pytest.fixture(scope="session", autouse=True)
def configure_el734_logging(request):
loglevel_str = request.config.getoption("--log")
level = parse_loglevel(loglevel_str)
logger = logging.getLogger("EL734")
logger.handlers.clear() # so pytest doesn't duplicate handlers
handler = logging.StreamHandler()
formatter = logging.Formatter("%(levelname)s [%(name)s]: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(level)

View File

@@ -2,18 +2,19 @@
import time
import pytest
from setup.classes import EL734
from src.classes import EL734
# Prepare the motor at the start of the test by resetting and homing it.
@pytest.fixture(scope="session", autouse=True)
def reset_and_home():
mot = EL734('el734_1', 'ax1')
mot.put_pv('stop', 1)
mot.put_pv('reseterrorpv', 1)
mot.put_pv('homeforward', 1)
mot.stop()
mot.reset()
mot.homeforward()
time.sleep(1)
mot.wait_for_done()
@pytest.fixture(autouse=True)
def motor():
return EL734('el734_1', 'ax1')

View File

@@ -2,120 +2,121 @@
import pytest
from setup.move import *
from setup.home import *
from setup.sinqMotor.limits import *
from setup.sinqMotor.speed import *
from src.move import *
from src.home import *
from src.sinqMotor.limits import *
from src.sinqMotor.speed import *
def test_move_to_low_limit_switch(motor):
motor.put_pv('stop', 1)
def test_move_to_low_limit_switch(motor: Motor):
motor.stop()
motor.wait_for_done()
move_to_low_limit_switch(motor)
def test_move_to_high_limit_switch(motor):
motor.put_pv('stop', 1)
def test_move_to_high_limit_switch(motor: Motor):
motor.stop()
motor.wait_for_done()
move_to_high_limit_switch(motor)
def test_move_while_move_same_direction(motor):
motor.put_pv('stop', 1)
def test_move_while_move_same_direction(motor: Motor):
motor.stop()
motor.wait_for_done()
move_while_move(motor, -30, -10, 2)
def test_move_while_move_opposite_direction(motor):
motor.put_pv('stop', 1)
def test_move_while_move_opposite_direction(motor: Motor):
motor.stop()
motor.wait_for_done()
move_while_move(motor, 10, -10, 2)
def test_stop(motor):
def test_stop(motor: Motor):
stop(motor, -60)
def test_stop_then_move(motor):
motor.put_pv('stop', 1)
def test_stop_then_move(motor: Motor):
motor.stop()
motor.wait_for_done()
stop_then_move(motor, 10)
@pytest.mark.stresstest
def test_stop_then_move_stress(motor):
motor.put_pv('stop', 1)
def test_stop_then_move_stress(motor: Motor):
motor.stop()
motor.wait_for_done()
for _ in range(50):
stop_then_move(motor, 5)
stop_then_move(motor, 0)
def test_speed_fields_valid(motor):
motor.put_pv('stop', 1)
def test_speed_fields_valid(motor: Motor):
motor.stop()
motor.wait_for_done()
speed_fields_valid(motor)
def test_set_minspeed_maxspeed_meanspeed(motor):
motor.put_pv('stop', 1)
def test_set_minspeed_maxspeed_meanspeed(motor: Motor):
motor.stop()
motor.wait_for_done()
set_minspeed_maxspeed_meanspeed(motor)
def test_set_invalid_speed_above_min_below_max(motor):
motor.put_pv('stop', 1)
def test_set_invalid_speed_above_min_below_max(motor: Motor):
motor.stop()
motor.wait_for_done()
set_invalid_speed_above_min_below_max(motor)
def test_set_speed_and_move(motor):
motor.put_pv('stop', 1)
def test_set_speed_and_move(motor: Motor):
motor.stop()
motor.wait_for_done()
set_speed_and_move(motor, motor.get_pv('maxspeed'), 5)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
@pytest.mark.stresstest
def test_set_speed_and_move_stress(motor):
motor.put_pv('stop', 1)
def test_set_speed_and_move_stress(motor: Motor):
motor.stop()
motor.wait_for_done()
for _ in range(50):
set_speed_and_move(motor, motor.get_pv('maxspeed'), 5)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
def test_targets_outside_limits(motor):
motor.put_pv('stop', 1)
def test_targets_outside_limits(motor: Motor):
motor.stop()
motor.wait_for_done()
targets_outside_limits(motor)
def test_home(motor):
motor.put_pv('stop', 1)
def test_home(motor: Motor):
motor.stop()
motor.wait_for_done()
home(motor, True)
def test_start_home_while_moving(motor):
motor.put_pv('stop', 1)
def test_start_home_while_moving(motor: Motor):
motor.stop()
motor.wait_for_done()
start_home_while_moving(motor, 15, True)
def test_start_home_while_moving(motor):
motor.put_pv('stop', 1)
def test_start_home_while_moving(motor: Motor):
motor.stop()
motor.wait_for_done()
start_home_while_moving(motor, 15, True)
def test_move_then_home(motor):
motor.put_pv('stop', 1)
def test_move_then_home(motor: Motor):
motor.stop()
motor.wait_for_done()
move_then_home(motor, 15, True)
def test_home_then_move(motor):
motor.put_pv('stop', 1)
def test_home_then_move(motor: Motor):
motor.stop()
motor.wait_for_done()
home_then_move(motor, 15, True)

View File

@@ -2,20 +2,24 @@
import time
import pytest
from setup.classes import MasterMACS
from src.classes import MasterMACS
# Prepare the motor at the start of the test by resetting and homing it.
@pytest.fixture(scope="session", autouse=True)
def reset_and_home():
"""
Prepare the motor at the start of the test by resetting and homing it.
"""
mot = MasterMACS('masterMacs1', 'ax1')
mot.put_pv('stop', 1)
mot.put_pv('reseterrorpv', 1)
mot.stop()
mot.reset()
mot.wait_disabled()
mot.enable_and_wait()
mot.put_pv('homeforward', 1)
mot.homeforward()
time.sleep(1)
mot.wait_for_done()
@pytest.fixture(autouse=True)
def motor():
return MasterMACS('masterMacs1', 'ax1')

View File

@@ -1,9 +1,9 @@
# Run a selection of common tests
from setup.move import *
from setup.sinqMotor.limits import *
from setup.home import *
from setup.sinqMotor.masterMacs.reset import reset
from src.move import *
from src.sinqMotor.limits import *
from src.home import *
from src.sinqMotor.masterMacs.reset import reset
def test_reset(motor):

View File

@@ -2,20 +2,23 @@
import time
import pytest
from setup.classes import TurboPMAC
from src.classes import TurboPMAC
# Prepare the motor at the start of the test by resetting and homing it.
@pytest.fixture(scope="session", autouse=True)
def reset_and_home():
mot = TurboPMAC('turboPmac1', 'ax1')
mot.put_pv('stop', 1)
mot.put_pv('reseterrorpv', 1)
mot.stop()
mot.reset()
mot.wait_disabled()
mot.enable_and_wait()
mot.put_pv('homeforward', 1)
mot.homeforward()
time.sleep(1)
mot.wait_for_done()
@pytest.fixture(autouse=True)
def motor():
return TurboPMAC('turboPmac1', 'ax1')

View File

@@ -2,22 +2,24 @@
import pytest
from setup.move import *
from setup.home import *
from setup.sinqMotor.limits import *
from setup.sinqMotor.speed import *
from setup.sinqMotor.turboPmac.reset import reset
from src.move import *
from src.home import *
from src.sinqMotor.limits import *
from src.sinqMotor.speed import *
from src.sinqMotor.turboPmac.reset import reset
def test_reset(motor):
reset(motor, 5)
reset(motor, 2)
@pytest.mark.stresstest
def test_reset_stress(motor):
for _ in range(50):
test_reset(motor)
def test_move_to_low_limit_switch(motor):
reset(motor)
move_to_low_limit_switch(motor)
@@ -47,6 +49,7 @@ def test_stop_then_move(motor):
reset(motor)
stop_then_move(motor, 20)
@pytest.mark.stresstest
def test_stop_then_move_stress(motor):
reset(motor)
@@ -107,7 +110,7 @@ def test_start_home_while_moving(motor):
def test_move_then_home(motor):
reset(motor, 1)
move_then_home(motor, 30, True)
def test_home_then_move(motor):
reset(motor, 1)

View File

@@ -1,7 +1,7 @@
# This module defines fixtures which are shared for all tests of motor "rot1".
import pytest
from setup.classes import TurboPMAC
from src.classes import TurboPMAC
@pytest.fixture(autouse=True)

View File

@@ -1,22 +1,24 @@
# Run a selection of common tests
import pytest
from setup.move import *
from setup.sinqMotor.limits import *
from setup.sinqMotor.speed import *
from setup.home import *
from setup.sinqMotor.turboPmac.reset import reset
from src.move import *
from src.sinqMotor.limits import *
from src.sinqMotor.speed import *
from src.home import *
from src.sinqMotor.turboPmac.reset import reset
def test_reset(motor):
reset(motor, 5)
reset(motor)
@pytest.mark.stresstest
def test_reset_stress(motor):
for _ in range(50):
test_reset(motor)
def test_move_to_low_limit_switch(motor):
reset(motor)
move_to_low_limit_switch(motor)
@@ -46,6 +48,7 @@ def test_stop_then_move(motor):
reset(motor)
stop_then_move(motor, 100)
@pytest.mark.stresstest
def test_stop_then_move_stress(motor):
reset(motor)
@@ -74,13 +77,15 @@ def test_set_speed_and_move(motor):
set_speed_and_move(motor, motor.get_pv('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
@pytest.mark.stresstest
def test_set_speed_and_move_stress(motor):
reset(motor)
for _ in range(50):
set_speed_and_move(motor, motor.get_pv('maxspeed'), 10)
set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0)
def test_targets_outside_limits(motor):
reset(motor)
targets_outside_limits(motor)