Logging improvements to the test framework

This commit is contained in:
2025-12-23 10:04:17 +01:00
parent 4ed368f69c
commit b947e7edfa
5 changed files with 56 additions and 18 deletions

View File

@@ -89,9 +89,6 @@ pytest tests/turboPmac1/ax1/test_common.py
### Configuring pytest
TODO: Add doc for --stresstest and --log flags and mention -s flag
And to run a specific test "test_something" within this file:
```bash
pytest tests/turboPmac1/ax1/test_common.py -k 'test_something'

View File

@@ -2,7 +2,7 @@ pvprefix: DRVTESTS
versions:
turboPmac: "1.5"
masterMacs: "1.5"
el734: "mathis_s"
el734: "0.1"
controllers:
turboPmac1:
ip: "172.28.101.24"

View File

@@ -293,7 +293,7 @@ class CaprotoWrapper:
Dummy function. In the original NICOS file, it is used to map EPICS
status to NICOS status.
"""
0, ""
return 0, ""
def close_subscription(self, subscription):
subscription.clear()

View File

@@ -77,7 +77,7 @@ class Motor:
__slots__ = ("_param_to_pv", "controller", "axis", "ip", "port",
"busypoll", "idlepoll", "pv", "default_position", 'logger',
"_epics_wrapper")
"_epics_wrapper", "_wait_for_done_info_interval")
# Motor record fields
fields = {
@@ -109,6 +109,7 @@ class Motor:
'alarm_status': 'STAT',
'alarm_severity': 'SEVR',
'precision': 'MRES',
'position_deadband': 'SPDB',
}
def __init__(self, controller, axis, default_position=0):
@@ -123,6 +124,7 @@ class Motor:
self.pv = f'{pvprefix}:{controller}:{axis}'
self.default_position = default_position
self._epics_wrapper = CaprotoWrapper(TIMEOUT)
self._wait_for_done_info_interval = 5 # Default value, can be overwritten.
# This will store PV objects for each PV param and is populated by the init function
self._param_to_pv = {}
@@ -159,13 +161,19 @@ class Motor:
self.logger.debug(f"read '{val}' from '{self._param_to_pv[pvparam]}'")
return val
def connection_change_callback(self, name, pvparam, is_connected, **kwargs):
if is_connected:
self.logger.debug('%s connected!', name)
else:
self.logger.warning('%s disconnected!', name)
def move(self, target):
# Set the new target
self.put_pv('writepv', target)
if self.at_target(target):
self.logger.info(f"already at target '{target}'")
self.logger.info(f"received move to '{target}' command, but motor is already at target")
else:
self.logger.info(f"moving to '{target}'")
@@ -179,15 +187,13 @@ class Motor:
# Set the new target
self.put_pv('writepv', target)
# Is the motor already moving?
if self.get_pv('donemoving') != 0:
self.logger.info(f"starting move to '{target}'")
self.wait_for_start()
if self.at_target(target):
self.logger.info(f"already at target '{target}'")
else:
self.logger.info(f"starting move to '{target}'")
self.wait_for_start()
self.logger.info(f"moving to '{target}'")
self.wait_for_done()
self.wait_for_done()
self.logger.info(
f"finished moving to '{target}', now at '{self.get_pv('readpv')}'")
@@ -203,9 +209,12 @@ class Motor:
if target is None:
target = self.get_pv('writepv')
position_deadband = self.get_pv('position_deadband')
precision = self.get_pv('precision')
return (self.get_pv('miss') == 0 and
math.isclose(self.get_pv('readpv'), target,
rel_tol=1e-9, abs_tol=self.get_pv('precision')))
rel_tol=1e-9, abs_tol=max(position_deadband, precision)))
def wait_for_start(self, maxwaittime=None):
if maxwaittime is None:
@@ -227,6 +236,7 @@ class Motor:
time.sleep(0.1)
def wait_for_done(self):
last_info = time.time()
while not self.get_pv('donemoving'):
if self.has_error():
error_msg = self.error_message()
@@ -238,6 +248,13 @@ class Motor:
'Unknown error while waiting for completion of movement')
time.sleep(0.1)
# Logging functionality
now = time.time()
if now > last_info + self._wait_for_done_info_interval:
last_info = now
current_position = self.get_pv('readpv')
self.logger.info(f"still moving, current position is {current_position}")
def has_error(self):
return self.get_pv('alarm_severity') == 'MAJOR'
@@ -250,15 +267,15 @@ class Motor:
return bool(str[15])
def stop(self):
self.logger.info(f"stopping")
self.logger.info("stopping")
self.put_pv('stop', 1)
def homeforward(self):
self.logger.info(f"homing forward")
self.logger.info("homing forward")
self.put_pv('homeforward', 1)
def homereverse(self):
self.logger.info(f"homing reverse")
self.logger.info("homing reverse")
self.put_pv('homereverse', 1)
def set_highlimit(self, high_limit):
@@ -287,6 +304,20 @@ class SinqMotor(Motor):
'errormsgpv': '-MsgTxt',
}
def __init__(self, controller, axis, default_position=0):
Motor.__init__(self, controller, axis, default_position)
# Log initial error messages
message = self.error_message()
if message:
self.logger.error(message)
# A callback which logs error messages
self._epics_wrapper.subscribe(self._get_pvs()['errormsgpv'], 'errormsgpv',
self.error_message_callback,
self.connection_change_callback)
def _get_pvs(self, pvs={}):
"""
:return: Dict of PV aliases and actual PV names.
@@ -301,6 +332,16 @@ class SinqMotor(Motor):
def error_message(self):
return self.get_pv('errormsgpv', as_string=True)
def error_message_callback(self, name, param, value, units, severity,
message, **kwargs):
"""
Override this for custom behaviour in subclasses.
"""
message = self.error_message()
if message:
self.logger.error(message)
def enable_and_wait(self, timeout=20):
self.enable(1)
self.wait_enabled(timeout)

View File

@@ -23,13 +23,13 @@ def test_move_to_high_limit_switch(motor: Motor):
def test_move_while_move_same_direction(motor: Motor):
motor.stop()
motor.wait_for_done()
move_while_move(motor, -30, -10, 2)
move_while_move(motor, -30, -10, 5)
def test_move_while_move_opposite_direction(motor: Motor):
motor.stop()
motor.wait_for_done()
move_while_move(motor, 10, -10, 2)
move_while_move(motor, 10, -10, 5)
def test_stop(motor: Motor):