From 8e5a5bb8d1f06aa03f936423d9b373c27087f63c Mon Sep 17 00:00:00 2001 From: smathis Date: Fri, 5 Dec 2025 15:44:46 +0100 Subject: [PATCH] WIP status of logging --- README.md | 2 + example_custom_script.py | 26 --- {setup => src}/__init__.py | 0 {setup => src}/caproto.py | 121 ++++++------ {setup => src}/classes.py | 186 ++++++++++++------ {setup => src}/home.py | 24 ++- {setup => src}/move.py | 30 +-- {setup => src}/sinqMotor/__init__.py | 0 {setup => src}/sinqMotor/limits.py | 8 +- .../sinqMotor/masterMacs/__init__.py | 0 {setup => src}/sinqMotor/masterMacs/reset.py | 9 +- {setup => src}/sinqMotor/speed.py | 23 +-- .../sinqMotor/turboPmac/__init__.py | 0 {setup => src}/sinqMotor/turboPmac/reset.py | 9 +- tests/conftest.py | 52 ++++- tests/el734_1/ax1/conftest.py | 11 +- tests/el734_1/ax1/test_common.py | 81 ++++---- tests/masterMacs1/ax1/conftest.py | 14 +- tests/masterMacs1/ax1/test_common.py | 8 +- tests/turboPmac1/ax1/conftest.py | 11 +- tests/turboPmac1/ax1/test_common.py | 15 +- tests/turboPmac1/ax5/conftest.py | 2 +- tests/turboPmac1/ax5/test_common.py | 17 +- 23 files changed, 384 insertions(+), 265 deletions(-) delete mode 100755 example_custom_script.py rename {setup => src}/__init__.py (100%) rename {setup => src}/caproto.py (74%) rename {setup => src}/classes.py (62%) rename {setup => src}/home.py (88%) rename {setup => src}/move.py (73%) rename {setup => src}/sinqMotor/__init__.py (100%) rename {setup => src}/sinqMotor/limits.py (83%) rename {setup => src}/sinqMotor/masterMacs/__init__.py (100%) rename {setup => src}/sinqMotor/masterMacs/reset.py (84%) rename {setup => src}/sinqMotor/speed.py (77%) rename {setup => src}/sinqMotor/turboPmac/__init__.py (100%) rename {setup => src}/sinqMotor/turboPmac/reset.py (83%) diff --git a/README.md b/README.md index 88937d3..f5556f3 100755 --- a/README.md +++ b/README.md @@ -67,6 +67,8 @@ in turn imports `ioc/config.cmd` to get the current configuration. ## Running the tests +TODO: Add doc for stresstest and log flags + ### General Running tests requires the following three steps: diff --git a/example_custom_script.py b/example_custom_script.py deleted file mode 100755 index 3722083..0000000 --- a/example_custom_script.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python3 - -# This script instructs the "lin1" motor (first axis of the sinqtest Turbo PMAC -# controller) to drive to -20 mm. Once it arrives, it then drives to -30 mm. -# Afterwards, the motor is disabled and then reenabled - -import time - -from setup.classes import TurboPMAC - -motor = TurboPMAC('turboPmac1', 'lin1') - -# Drive to position -20 mm -motor.move_and_wait(-20) - -# Drive to position -30 mm -motor.move_and_wait(-30) - -# Disable the motor -motor.put_pv('enable', 0) - -# Wait a bit so the motor has been disabled -time.sleep(2) - -# Reenable the motor -motor.put_pv('enable', 1) diff --git a/setup/__init__.py b/src/__init__.py similarity index 100% rename from setup/__init__.py rename to src/__init__.py diff --git a/setup/caproto.py b/src/caproto.py similarity index 74% rename from setup/caproto.py rename to src/caproto.py index 2f30f73..6086377 100644 --- a/setup/caproto.py +++ b/src/caproto.py @@ -18,17 +18,16 @@ # # Module authors: # Matt Clarke -# Stefan Mathis +# +# ***************************************************************************** # # ***************************************************************************** # # This file has been taken from NICOS, the original file can be found in: # https://github.com/mlz-ictrl/nicos/blob/master/nicos/devices/epics/pva/caproto.py - from functools import partial -import numpy as np from caproto import CaprotoTimeoutError, ChannelType from caproto.sync.client import read, write from caproto.threading.client import Context @@ -88,28 +87,6 @@ STATUS_TO_MESSAGE = { _Context = Context() -def caget(name, timeout=3.0): - """ Returns the PV's current value in its raw form via CA. - - :param name: the PV name - :param timeout: the EPICS timeout - :return: the PV's raw value - """ - response = read(name, timeout=timeout) - return response.data[0] if len(response.data) == 1 else response.data - - -def caput(name, value, wait=False, timeout=3.0): - """ Sets a PV's value via CA. - - :param name: the PV name - :param value: the value to set - :param wait: whether to wait for completion - :param timeout: the EPICS timeout - """ - write(name, value, notify=wait, timeout=timeout) - - class CaprotoWrapper: """ Class that wraps the caproto module that provides EPICS Channel Access (CA) support. @@ -146,6 +123,12 @@ class CaprotoWrapper: def get_pv_value(self, pvname, as_string=False): try: + # response is a ReadResponse objects with the fields: + # - data: a scalar or an array depending on the record type + # - metada: Control metdata such as limits, units, etc. + # - status: Record status + # - severity: Error severity + # - timestamp: Read timestamp response = self._pvs[pvname].read(timeout=self._timeout, data_type='control') return self._convert_value(pvname, response, as_string) @@ -153,58 +136,67 @@ class CaprotoWrapper: raise TimeoutError(f'getting {pvname} timed out') from None def _convert_value(self, pvname, response, as_string=False): + """ + Convert the data returned from reading a PV into its "natural" type. + + Caproto always returns the read values wrapped in an array + (a numpy.ndarray) if Numpy is installed, which always holds true in + NICOS. Furthermore, strings are returned as (an array of) raw bytes / + hex values. + + Therefore, a variety of conversions are performed to make sure the + output type matches the expectations: + - If the return type is a scalar (array length is 1) and the PV is an + enum (pvname in self._choices), then the string representation from + self._choices is returned if as_string == True (if as_string == False, + the scalar itself is returned). + - If as_string == True, the array is treated as a C-string and converted + into a Python string using utf-8 decoding, stripping any NULL + terminators in the process. + - If the value is a scalar, it is is returned as such. Otherwise, the + array is returned. + """ + scalar = None - # By default, the response data is always a list to cover all possible - # readback values from EPICS (lists, strings, chars, numbers). The last - # two cases need to be treated in a special way. They can be identified - # by the list length being 1. if len(response.data) == 1: - value = response.data[0] + # Unwrap the scalar + scalar = response.data[0] + + # Treat the enum case: If the pvname is an enum, return the enum + # string alias if as_string == True, otherwise return the raw value. if pvname in self._choices: - return self._choices[pvname][value] if as_string else value - elif isinstance(value, bytes): - return value.decode() + return self._choices[pvname][scalar] if as_string else scalar - # If an empty string is returned, the data has a single entry - # (the NULL terminator) - if as_string: - return bytes(response.data).rstrip(b'\x00').decode( - encoding='utf-8', errors='ignore') - - return value - - # Strings are read with their NULL terminator, hence it needs to be - # stripped before decoding + # Decode the array as a string, if requested if as_string: - return bytes(response.data).rstrip(b'\x00').decode( + return response.data.tobytes().rstrip(b'\x00').decode( encoding='utf-8', errors='ignore') - if isinstance(response.data, np.ndarray): - val_type = FTYPE_TO_TYPE[self._pvs[pvname].channel.native_data_type] - if val_type == bytes or as_string: - return response.data.tobytes().decode() - + # If the value is a scalar, return it, otherwise return the array. + # Explicit None check is needed here because scalar could be a zero! + if scalar is not None: + return scalar return response.data - def put_pv_value(self, pvname, value, wait=False): - if pvname in self._choices: - value = self._choices[pvname].index(value) - try: - self._pvs[pvname].write(value, wait=wait, timeout=self._timeout) - except CaprotoTimeoutError: - raise TimeoutError(f'setting {pvname} timed out') from None + def put_pv_value(self, pvname, value, timeout=None): + # As is evident from the source code of pv.write (see + # https://caproto.github.io/caproto/v1.2.0/_modules/caproto/threading/client.html#PV.write), + # the `timeout` doesn't do anything if `wait == False`. + # So we can derive the value of `wait` directly from timeout: + # If `timeout == None` (no timeout being given at the callsite), + # `ẁait = False`, otherwise `ẁait = True`. + wait = timeout is not None - def put_pv_value_blocking(self, pvname, value, block_timeout=60): if pvname in self._choices: value = self._choices[pvname].index(value) try: - self._pvs[pvname].write(value, wait=True, timeout=block_timeout) + self._pvs[pvname].write(value, wait=wait, timeout=timeout) except CaprotoTimeoutError: raise TimeoutError(f'setting {pvname} timed out') from None def get_pv_type(self, pvname): data_type = self._pvs[pvname].channel.native_data_type - return FTYPE_TO_TYPE.get(data_type) + return FTYPE_TO_TYPE.get(data_type, None) def get_alarm_status(self, pvname): values = self.get_control_values(pvname) @@ -297,12 +289,11 @@ class CaprotoWrapper: connection_callback(pvname, pvparam, state == 'connected') def _extract_alarm_info(self, values): - # The EPICS 'severity' matches to the NICOS `status` and the message has - # a short description of the alarm details. - if hasattr(values, 'severity'): - message = STATUS_TO_MESSAGE[values.status] - return values.severity, '' if message == 'NO_ALARM' else message - return values.severity, 'alarm information unavailable' + """ + Dummy function. In the original NICOS file, it is used to map EPICS + status to NICOS status. + """ + 0, "" def close_subscription(self, subscription): subscription.clear() diff --git a/setup/classes.py b/src/classes.py similarity index 62% rename from setup/classes.py rename to src/classes.py index 1c96193..16e7c61 100755 --- a/setup/classes.py +++ b/src/classes.py @@ -1,34 +1,58 @@ import time import os import math +import logging +import sys -import pytest import yaml -from setup.caproto import CaprotoWrapper + +from src.caproto import CaprotoWrapper TIMEOUT = 5 + def read_config(): root_dir = os.path.dirname(__file__) with open(root_dir + "/../config.yaml", "r") as f: return yaml.safe_load(f) +def setup_custom_logger(name): + """ + Formatting of the logger. Taken from: + https://stackoverflow.com/questions/28330317/print-timestamp-for-logging-in-python + """ + formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + handler = logging.FileHandler('log.txt', mode='w') + handler.setFormatter(formatter) + screen_handler = logging.StreamHandler(stream=sys.stdout) + screen_handler.setFormatter(formatter) + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + logger.addHandler(handler) + logger.addHandler(screen_handler) + return logger + + class MajorState(Exception): pass + class StartTimeoutError(TimeoutError): pass + class IocNotRunning(Exception): pass + class Motor: """ This object represents a motor used for testing. A few notes regarding EPICS interaction: - + Although it would be feasible to simply use the `read` and `write` functions in `caproto.sync.client` to access the motor PVs directly everytime, this approach is problematic because it opens and closes an UDP socket everytime. In practice, this is known to be susceptible to timeout errors due to high volumes @@ -48,8 +72,12 @@ class Motor: function therefore needs to be overwritten correspondingly in subclasses. """ + __slots__ = ("_param_to_pv", "fields", "controller", "axis", "ip", "port", + "busypoll", "idlepoll", "pv", "default_position", 'logger', + "_epics_wrapper") + # This will store PV objects for each PV param and is populated by the init function - _param_to_pv = {} + _param_to_pv = {} # Motor record fields fields = { @@ -83,14 +111,16 @@ class Motor: 'precision': 'MRES', } - def __init__(self, controller, name, default_position=0): + def __init__(self, controller, axis, default_position=0): + self.controller = controller + self.axis = axis config = read_config() self.ip = config['controllers'][controller]['ip'] self.port = config['controllers'][controller]['port'] self.busypoll = config['controllers'][controller]['busypoll'] self.idlepoll = config['controllers'][controller]['idlepoll'] pvprefix = config['pvprefix'] - self.pv = f'{pvprefix}:{controller}:{name}' + self.pv = f'{pvprefix}:{controller}:{axis}' self.default_position = default_position self._epics_wrapper = CaprotoWrapper(TIMEOUT) @@ -98,7 +128,10 @@ class Motor: self._epics_wrapper.connect_pv(pvname) self._param_to_pv[access_name] = pvname - def _get_pvs(self, pvs ={}): + # Set up for all "motor" devices. + self.logger = setup_custom_logger(f"motor.{controller}.{axis}") + + def _get_pvs(self, pvs={}): """ :return: Dict of PV aliases and actual PV names. """ @@ -106,37 +139,33 @@ class Motor: pvs[alias] = '.'.join((self.pv, pvname)) return pvs - def put_pv(self, pvparam, value, wait=False): - self._epics_wrapper.put_pv_value(self._param_to_pv[pvparam], value, - wait=wait) + def put_pv(self, pvparam, value, timeout=None): + self.logger.debug( + f"writing '{value}' to '{self._param_to_pv[pvparam]}'") - def _convert_value(self, response, as_string=False): + self._epics_wrapper.put_pv_value( + self._param_to_pv[pvparam], value, timeout=timeout) - # By default, the response data is always a list to cover all possible - # readback values from EPICS (lists, strings, chars, numbers). The last - # two cases need to be treated in a special way. They can be identified - # by the list length being 1. - if len(response.data) == 1: - value = response.data[0] - if isinstance(value, bytes): - return value.decode() - # If an empty string is returned, the data has a single entry - # (the NULL terminator) - if as_string: - return bytes(response.data).rstrip(b'\x00').decode( - encoding='utf-8', errors='ignore') - return value - - # Strings are read with their NULL terminator, hence it needs to be - # stripped before decoding - if as_string: - return bytes(response.data).rstrip(b'\x00').decode( - encoding='utf-8', errors='ignore') - return response.data + if timeout is not None: + self.logger.debug( + f"finished blocking writing '{value}' to '{self._param_to_pv[pvparam]}'") def get_pv(self, pvparam, as_string=False): - return self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam], - as_string=as_string) + self.logger.debug(f"reading '{self._param_to_pv[pvparam]}'") + val = self._epics_wrapper.get_pv_value(self._param_to_pv[pvparam], + as_string=as_string) + self.logger.debug(f"read '{val}' from '{self._param_to_pv[pvparam]}'") + return val + + def move(self, target): + + # Set the new target + self.put_pv('writepv', target) + + if self.at_target(target): + self.logger.info(f"already at target '{target}'") + else: + self.logger.info(f"moving to '{target}'") def move_and_wait(self, target): """ @@ -145,18 +174,20 @@ class Motor: occurred during movement. In case this happens, an error is raised. """ - # Is the motor already at its target? - if self.at_target(target): - return - - # Is the motor already moving? - already_moving = self.get_pv('donemoving') == 0 - + # Set the new target self.put_pv('writepv', target) - if not already_moving: + # Is the motor already moving? + if self.get_pv('donemoving') != 0: + self.logger.info(f"starting move to '{target}'") self.wait_for_start() + if self.at_target(target): + self.logger.info(f"already at target '{target}'") + else: + self.logger.info(f"moving to '{target}'") self.wait_for_done() + self.logger.info( + f"finished moving to '{target}', now at '{self.get_pv('readpv')}'") def limits(self): return (self.get_pv('lowlimit'), self.get_pv('highlimit')) @@ -186,9 +217,11 @@ class Motor: if self.has_error(): error_msg = self.error_message() if error_msg: - raise MajorState(f'Error while waiting for start of movement: {error_msg}') + raise MajorState( + f'Error while waiting for start of movement: {error_msg}') else: - raise MajorState('Unknown error while waiting for start of movement') + raise MajorState( + 'Unknown error while waiting for start of movement') time.sleep(0.1) def wait_for_done(self): @@ -196,14 +229,16 @@ class Motor: if self.has_error(): error_msg = self.error_message() if error_msg: - raise MajorState(f'Error while waiting for completion of movement: {error_msg}') + raise MajorState( + f'Error while waiting for completion of movement: {error_msg}') else: - raise MajorState('Unknown error while waiting for completion of movement') + raise MajorState( + 'Unknown error while waiting for completion of movement') time.sleep(0.1) def has_error(self): return self.get_pv('alarm_severity') == 'MAJOR' - + def error_message(self): return '' @@ -212,6 +247,30 @@ class Motor: str = format(int(self.get_pv('status')), '016b') return bool(str[15]) + def stop(self): + self.logger.info(f"stopping") + self.put_pv('stop', 1) + + def homeforward(self): + self.logger.info(f"homing forward") + self.put_pv('homeforward', 1) + + def homereverse(self): + self.logger.info(f"homing reverse") + self.put_pv('homereverse', 1) + + def set_highlimit(self, high_limit): + self.logger.info(f"setting high limit {high_limit}") + self.put_pv('dialhighlimit', high_limit) + + def set_lowlimit(self, low_limit): + self.logger.info(f"setting low limit {low_limit}") + self.put_pv('diallowlimit', low_limit) + + def set_speed(self, speed): + self.logger.info(f"setting speed {speed}") + self.put_pv('speed', speed) + class SinqMotor(Motor): # PV suffixes used in SinqMotor drivers @@ -224,9 +283,9 @@ class SinqMotor(Motor): 'can_set_speed': ':CanSetSpeed', 'reseterrorpv': ':Reset', 'errormsgpv': '-MsgTxt', - } + } - def _get_pvs(self, pvs ={} ): + def _get_pvs(self, pvs={}): """ :return: Dict of PV aliases and actual PV names. """ @@ -241,13 +300,24 @@ class SinqMotor(Motor): return self.get_pv('errormsgpv', as_string=True) def enable_and_wait(self, timeout=20): - self.put_pv('enable', 1) + self.enable(1) self.wait_enabled(timeout) def disable_and_wait(self, timeout=20): - self.put_pv('enable', 0) + self.enable(0) self.wait_disabled(timeout) + def enable(self, val): + if val == 0: + self.logger.info(f"disabling") + else: + self.logger.info(f"enabling") + self.put_pv('enable', 1) + + def reset(self): + self.logger.info(f"resetting error") + self.put_pv('reseterrorpv', 1) + def wait_enabled(self, timeout=10): """ Wait until the motor is enabled or a timeout has been reached. @@ -271,13 +341,19 @@ class SinqMotor(Motor): while check(self.get_pv('enable_rbv')): if time.time() > now + timeout: if wait_for_enabling: - pytest.fail( - f'Motor {self.pv} could not be enabled in {timeout} seconds') + + raise MajorState( + f'could not be enabled in {timeout} seconds') else: - pytest.fail( - f'Motor {self.pv} could not be disabled in {timeout} seconds') + raise MajorState( + f'could not be enabled in {timeout} seconds') time.sleep(0.5) + if wait_for_enabling: + self.logger.info(f"enabled") + else: + self.logger.info(f"disabled") + class TurboPMAC(SinqMotor): pass diff --git a/setup/home.py b/src/home.py similarity index 88% rename from setup/home.py rename to src/home.py index 01aadde..e43bd32 100755 --- a/setup/home.py +++ b/src/home.py @@ -1,8 +1,10 @@ import time import threading +from src.classes import Motor -def home(motor, forward): + +def home(motor: Motor, forward: bool): encoder = motor.get_pv('encoder_type', as_string=True) if encoder == 'absolute': @@ -17,22 +19,22 @@ def home(motor, forward): # Start a homing run and observe the motor behaviour depending on the # encoder type if forward: - motor.put_pv('homeforward', 1) + motor.homeforward() else: - motor.put_pv('homereverse', 1) + motor.homereverse() motor.wait_for_start() if can_home: # The controller might take a bit of time to actually start the movement, - # therefore we wait a bit. + # therefore we wait a bit. time.sleep(2 * motor.busypoll) # Motor should start movement assert motor.get_pv('moving') == 1 assert motor.get_pv('donemoving') == 0 assert not motor.has_error() - # TODO: This should work + # TODO: This should work # assert not motor.is_homed() motor.wait_for_done() @@ -51,28 +53,30 @@ def home(motor, forward): assert motor.get_pv('donemoving') == 1 assert not motor.has_error() + def start_home_while_moving(motor, target, forward): """ Start a movement to target, wait two seconds and then home the motor. """ - motor.put_pv('writepv', target) + motor.move(target) time.sleep(2) home(motor, forward) assert not motor.at_target(target) + def start_move_while_homing(motor, target, forward): """ Try to start a movement while homing (this should fail and the motor should continue homing instead) """ def try_start_move(): time.sleep(5) - motor.put_pv('writepv', target) + motor.move(target) - thread = threading.Thread(target = try_start_move) + thread = threading.Thread(target=try_start_move) thread.start() home(motor, forward) assert not motor.at_target(target) - + def move_then_home(motor, target, forward): """ @@ -83,6 +87,7 @@ def move_then_home(motor, target, forward): assert not motor.has_error() home(motor, forward) + def home_then_move(motor, target, forward): """ Home the motor and then move to the target @@ -91,4 +96,3 @@ def home_then_move(motor, target, forward): motor.move_and_wait(target) assert motor.at_target(target) assert not motor.has_error() - diff --git a/setup/move.py b/src/move.py similarity index 73% rename from setup/move.py rename to src/move.py index da84050..e631696 100755 --- a/setup/move.py +++ b/src/move.py @@ -2,61 +2,63 @@ import time import pytest -from setup.classes import StartTimeoutError +from src.classes import Motor, StartTimeoutError -def move_to_low_limit_switch(motor): + +def move_to_low_limit_switch(motor: Motor): low_limit = motor.limits()[0] motor.move_and_wait(low_limit) assert motor.at_target() # Field is currently not set in our drivers - #assert motor.get_pv('lowlimitswitch') == 1 + # assert motor.get_pv('lowlimitswitch') == 1 assert not motor.has_error() -def move_to_high_limit_switch(motor): +def move_to_high_limit_switch(motor: Motor): low_limit = motor.limits()[1] motor.move_and_wait(low_limit) assert motor.at_target() # Field is currently not set in our drivers - #assert motor.get_pv('highlimitswitch') == 1 + # assert motor.get_pv('highlimitswitch') == 1 assert not motor.has_error() -def move_while_move(motor, first_target, second_target, time_first_target): +def move_while_move(motor: Motor, first_target, second_target, time_first_target): """ Start moving to a position which is sufficiently far away, then interrupt this move command with another move command. This function assumes a sufficiently large distance between the current motor position and first_target so it has enough time to issue the second move command. """ - motor.put_pv('writepv', first_target) + motor.move(first_target) time.sleep(time_first_target) motor.move_and_wait(second_target) assert motor.at_target(second_target) assert not motor.has_error() -def stop_then_move(motor, target): - motor.put_pv('stop', 1) + +def stop_then_move(motor: Motor, target): + motor.stop('stop', 1) motor.wait_for_done() motor.move_and_wait(target) -def stop(motor, target): +def stop(motor: Motor, target): """ Stop an motor while it is moving to a target. This function assumes a sufficiently large distance between the current motor position and target. """ - motor.put_pv('writepv', target) + motor.move(target) time.sleep(1) - motor.put_pv('stop', 1) + motor.stop(1) motor.wait_for_done() assert not motor.at_target(target) assert not motor.has_error() -def invalid_target(motor, target): +def invalid_target(motor: Motor, target): current_pos = motor.get_pv('readpv') try: motor.move_and_wait(target) @@ -65,7 +67,7 @@ def invalid_target(motor, target): assert motor.at_target(current_pos) -def targets_outside_limits(motor): +def targets_outside_limits(motor: Motor): """ Try to move the motor outside its limits """ diff --git a/setup/sinqMotor/__init__.py b/src/sinqMotor/__init__.py similarity index 100% rename from setup/sinqMotor/__init__.py rename to src/sinqMotor/__init__.py diff --git a/setup/sinqMotor/limits.py b/src/sinqMotor/limits.py similarity index 83% rename from setup/sinqMotor/limits.py rename to src/sinqMotor/limits.py index 0accad5..b201804 100755 --- a/setup/sinqMotor/limits.py +++ b/src/sinqMotor/limits.py @@ -1,10 +1,10 @@ import time import math -from setup.classes import read_config +from src.classes import Motor -def reread_limits_from_hw(motor): +def reread_limits_from_hw(motor: Motor): """ sinqMotor drivers usually read their limits from the hardware at each poll, hence any values manually written to DHLM or DLLM should be overwritten @@ -12,8 +12,8 @@ def reread_limits_from_hw(motor): """ (high_limit, low_limit) = motor.limits() - motor.put_pv('dialhighlimit', high_limit+10) - motor.put_pv('diallowlimit', low_limit-10) + motor.set_highlimit(high_limit+10) + motor.set_lowlimit(low_limit-10) # Wait two idle poll periods time.sleep(2 * motor.idlepoll) diff --git a/setup/sinqMotor/masterMacs/__init__.py b/src/sinqMotor/masterMacs/__init__.py similarity index 100% rename from setup/sinqMotor/masterMacs/__init__.py rename to src/sinqMotor/masterMacs/__init__.py diff --git a/setup/sinqMotor/masterMacs/reset.py b/src/sinqMotor/masterMacs/reset.py similarity index 84% rename from setup/sinqMotor/masterMacs/reset.py rename to src/sinqMotor/masterMacs/reset.py index 36ca594..e799b55 100755 --- a/setup/sinqMotor/masterMacs/reset.py +++ b/src/sinqMotor/masterMacs/reset.py @@ -1,6 +1,9 @@ import time -def reset(motor, target): +from src.classes import SinqMotor + + +def reset(motor: SinqMotor, target): """ Reset the motor for the next test. This means the following things: 1) Stopping the motor @@ -16,9 +19,9 @@ def reset(motor, target): if target is None: target = motor.default_position - motor.put_pv('stop', 1) + motor.stop(1) motor.wait_for_done() - motor.put_pv('reseterrorpv', 1) + motor.reset() motor.wait_disabled() motor.enable_and_wait() motor.move_and_wait(target) diff --git a/setup/sinqMotor/speed.py b/src/sinqMotor/speed.py similarity index 77% rename from setup/sinqMotor/speed.py rename to src/sinqMotor/speed.py index 2b29e26..3f0d1a9 100644 --- a/setup/sinqMotor/speed.py +++ b/src/sinqMotor/speed.py @@ -1,4 +1,7 @@ -def speed_fields_valid(motor): +from src.classes import Motor + + +def speed_fields_valid(motor: Motor): """ Check if the motor speed fields are valid: VMAX >= VELO >= VBAS @@ -16,10 +19,10 @@ def speed_fields_valid(motor): assert vmax == velo == vbas -def set_speed(motor, new_speed): +def set_speed(motor: Motor, new_speed): old_speed = motor.get_pv('speed') try: - motor.put_pv('speed', new_speed) + motor.set_speed(new_speed) if motor.get_pv('can_set_speed') != 0: if new_speed > motor.get_pv('maxspeed'): assert motor.get_pv('speed') == motor.get_pv('maxspeed') @@ -30,27 +33,25 @@ def set_speed(motor, new_speed): else: assert motor.get_pv('speed') == old_speed finally: - motor.put_pv('speed', old_speed) + motor.set_speed(old_speed) -def set_minspeed_maxspeed_meanspeed(motor): +def set_minspeed_maxspeed_meanspeed(motor: Motor): set_speed(motor, motor.get_pv('maxspeed')) set_speed(motor, motor.get_pv('basespeed')) set_speed(motor, 0.5 * (motor.get_pv('basespeed') + motor.get_pv('maxspeed'))) -def set_invalid_speed_above_min_below_max(motor): +def set_invalid_speed_above_min_below_max(motor: Motor): set_speed(motor, motor.get_pv('maxspeed') + 1) set_speed(motor, motor.get_pv('basespeed') - 1) -def set_speed_and_move(motor, new_speed, target): +def set_speed_and_move(motor: Motor, new_speed, target): old_speed = motor.get_pv('speed') try: - motor.put_pv('speed', new_speed) + motor.set_speed(new_speed) motor.move_and_wait(target) assert motor.at_target(target) finally: - motor.put_pv('speed', old_speed) - - + motor.set_speed(old_speed) diff --git a/setup/sinqMotor/turboPmac/__init__.py b/src/sinqMotor/turboPmac/__init__.py similarity index 100% rename from setup/sinqMotor/turboPmac/__init__.py rename to src/sinqMotor/turboPmac/__init__.py diff --git a/setup/sinqMotor/turboPmac/reset.py b/src/sinqMotor/turboPmac/reset.py similarity index 83% rename from setup/sinqMotor/turboPmac/reset.py rename to src/sinqMotor/turboPmac/reset.py index a4cb7e1..adb7b05 100755 --- a/setup/sinqMotor/turboPmac/reset.py +++ b/src/sinqMotor/turboPmac/reset.py @@ -1,6 +1,9 @@ import time -def reset(motor, target=None): +from src.classes import SinqMotor + + +def reset(motor: SinqMotor, target=None): """ Reset the motor for the next test. This means the following things: 1) Stopping the motor @@ -16,9 +19,9 @@ def reset(motor, target=None): if target is None: target = motor.default_position - motor.put_pv('stop', 1) + motor.stop() motor.wait_for_done() - motor.put_pv('reseterrorpv', 1) + motor.reset() motor.wait_disabled() motor.enable_and_wait() motor.move_and_wait(target) diff --git a/tests/conftest.py b/tests/conftest.py index 9798511..e78a30b 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,11 @@ -import time from pathlib import Path +import logging from caproto.sync.client import read import pytest -from setup.classes import read_config, IocNotRunning +from src.classes import read_config, IocNotRunning + def check_ioc_running(): config = read_config() @@ -24,10 +25,12 @@ def check_ioc_running(): path = Path.cwd() / 'ioc' / 'startioc' raise IocNotRunning(f'Start the test IOC first by running {path}') + @pytest.fixture(autouse=True) def check_ioc_running_before_test(): check_ioc_running() + def pytest_addoption(parser): """ This function adds the --stress flag to Pytest which can be given to @@ -39,6 +42,22 @@ def pytest_addoption(parser): default=False, help="Run tests marked as stresstest" ) + parser.addoption( + "--log", + action="store", + default="INFO", + help=""" + Set log level: + - DEBUG: Show all raw caput and caget commands + - INFO: Write high-level commands (e.g. move, stop, home etc.) + - WARNING: Not used + - ERROR: Serious error (e.g. motor could not be enabled / disabled) + - CRITICAL: Not used + All error levels higher than the defined one are forwarded to the Pytest + logger. + """ + ) + def pytest_runtest_setup(item): """ @@ -46,4 +65,31 @@ def pytest_runtest_setup(item): mark them as stress tests via `@pytest.mark.stresstest`. """ if 'stresstest' in item.keywords and not item.config.getoption("--stresstest"): - pytest.skip("Need --stress to run this test") \ No newline at end of file + pytest.skip("Need --stress to run this test") + + +def parse_loglevel(value: str) -> int: + value = value.upper() + if value.isdigit(): + return int(value) + + if value in logging._nameToLevel: # DEBUG, INFO, … + return logging._nameToLevel[value] + + raise ValueError(f"Unknown log level: {value}") + + +@pytest.fixture(scope="session", autouse=True) +def configure_el734_logging(request): + loglevel_str = request.config.getoption("--log") + level = parse_loglevel(loglevel_str) + + logger = logging.getLogger("EL734") + logger.handlers.clear() # so pytest doesn't duplicate handlers + + handler = logging.StreamHandler() + formatter = logging.Formatter("%(levelname)s [%(name)s]: %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + + logger.setLevel(level) diff --git a/tests/el734_1/ax1/conftest.py b/tests/el734_1/ax1/conftest.py index 0a70c1d..7c92756 100644 --- a/tests/el734_1/ax1/conftest.py +++ b/tests/el734_1/ax1/conftest.py @@ -2,18 +2,19 @@ import time import pytest -from setup.classes import EL734 +from src.classes import EL734 + -# Prepare the motor at the start of the test by resetting and homing it. @pytest.fixture(scope="session", autouse=True) def reset_and_home(): mot = EL734('el734_1', 'ax1') - mot.put_pv('stop', 1) - mot.put_pv('reseterrorpv', 1) - mot.put_pv('homeforward', 1) + mot.stop() + mot.reset() + mot.homeforward() time.sleep(1) mot.wait_for_done() + @pytest.fixture(autouse=True) def motor(): return EL734('el734_1', 'ax1') diff --git a/tests/el734_1/ax1/test_common.py b/tests/el734_1/ax1/test_common.py index a4de0a7..38dfd85 100644 --- a/tests/el734_1/ax1/test_common.py +++ b/tests/el734_1/ax1/test_common.py @@ -2,120 +2,121 @@ import pytest -from setup.move import * -from setup.home import * -from setup.sinqMotor.limits import * -from setup.sinqMotor.speed import * +from src.move import * +from src.home import * +from src.sinqMotor.limits import * +from src.sinqMotor.speed import * -def test_move_to_low_limit_switch(motor): - motor.put_pv('stop', 1) +def test_move_to_low_limit_switch(motor: Motor): + motor.stop() motor.wait_for_done() move_to_low_limit_switch(motor) -def test_move_to_high_limit_switch(motor): - motor.put_pv('stop', 1) +def test_move_to_high_limit_switch(motor: Motor): + motor.stop() motor.wait_for_done() move_to_high_limit_switch(motor) -def test_move_while_move_same_direction(motor): - motor.put_pv('stop', 1) +def test_move_while_move_same_direction(motor: Motor): + motor.stop() motor.wait_for_done() move_while_move(motor, -30, -10, 2) -def test_move_while_move_opposite_direction(motor): - motor.put_pv('stop', 1) +def test_move_while_move_opposite_direction(motor: Motor): + motor.stop() motor.wait_for_done() move_while_move(motor, 10, -10, 2) -def test_stop(motor): +def test_stop(motor: Motor): stop(motor, -60) -def test_stop_then_move(motor): - motor.put_pv('stop', 1) +def test_stop_then_move(motor: Motor): + motor.stop() motor.wait_for_done() stop_then_move(motor, 10) + @pytest.mark.stresstest -def test_stop_then_move_stress(motor): - motor.put_pv('stop', 1) +def test_stop_then_move_stress(motor: Motor): + motor.stop() motor.wait_for_done() for _ in range(50): stop_then_move(motor, 5) stop_then_move(motor, 0) -def test_speed_fields_valid(motor): - motor.put_pv('stop', 1) +def test_speed_fields_valid(motor: Motor): + motor.stop() motor.wait_for_done() speed_fields_valid(motor) -def test_set_minspeed_maxspeed_meanspeed(motor): - motor.put_pv('stop', 1) +def test_set_minspeed_maxspeed_meanspeed(motor: Motor): + motor.stop() motor.wait_for_done() set_minspeed_maxspeed_meanspeed(motor) -def test_set_invalid_speed_above_min_below_max(motor): - motor.put_pv('stop', 1) +def test_set_invalid_speed_above_min_below_max(motor: Motor): + motor.stop() motor.wait_for_done() set_invalid_speed_above_min_below_max(motor) -def test_set_speed_and_move(motor): - motor.put_pv('stop', 1) +def test_set_speed_and_move(motor: Motor): + motor.stop() motor.wait_for_done() set_speed_and_move(motor, motor.get_pv('maxspeed'), 5) set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) @pytest.mark.stresstest -def test_set_speed_and_move_stress(motor): - motor.put_pv('stop', 1) +def test_set_speed_and_move_stress(motor: Motor): + motor.stop() motor.wait_for_done() for _ in range(50): set_speed_and_move(motor, motor.get_pv('maxspeed'), 5) set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) -def test_targets_outside_limits(motor): - motor.put_pv('stop', 1) +def test_targets_outside_limits(motor: Motor): + motor.stop() motor.wait_for_done() targets_outside_limits(motor) -def test_home(motor): - motor.put_pv('stop', 1) +def test_home(motor: Motor): + motor.stop() motor.wait_for_done() home(motor, True) -def test_start_home_while_moving(motor): - motor.put_pv('stop', 1) +def test_start_home_while_moving(motor: Motor): + motor.stop() motor.wait_for_done() start_home_while_moving(motor, 15, True) -def test_start_home_while_moving(motor): - motor.put_pv('stop', 1) +def test_start_home_while_moving(motor: Motor): + motor.stop() motor.wait_for_done() start_home_while_moving(motor, 15, True) -def test_move_then_home(motor): - motor.put_pv('stop', 1) +def test_move_then_home(motor: Motor): + motor.stop() motor.wait_for_done() move_then_home(motor, 15, True) - -def test_home_then_move(motor): - motor.put_pv('stop', 1) + +def test_home_then_move(motor: Motor): + motor.stop() motor.wait_for_done() home_then_move(motor, 15, True) diff --git a/tests/masterMacs1/ax1/conftest.py b/tests/masterMacs1/ax1/conftest.py index 9c1e1b6..385179a 100755 --- a/tests/masterMacs1/ax1/conftest.py +++ b/tests/masterMacs1/ax1/conftest.py @@ -2,20 +2,24 @@ import time import pytest -from setup.classes import MasterMACS +from src.classes import MasterMACS + -# Prepare the motor at the start of the test by resetting and homing it. @pytest.fixture(scope="session", autouse=True) def reset_and_home(): + """ + Prepare the motor at the start of the test by resetting and homing it. + """ mot = MasterMACS('masterMacs1', 'ax1') - mot.put_pv('stop', 1) - mot.put_pv('reseterrorpv', 1) + mot.stop() + mot.reset() mot.wait_disabled() mot.enable_and_wait() - mot.put_pv('homeforward', 1) + mot.homeforward() time.sleep(1) mot.wait_for_done() + @pytest.fixture(autouse=True) def motor(): return MasterMACS('masterMacs1', 'ax1') diff --git a/tests/masterMacs1/ax1/test_common.py b/tests/masterMacs1/ax1/test_common.py index deb359a..b894b25 100755 --- a/tests/masterMacs1/ax1/test_common.py +++ b/tests/masterMacs1/ax1/test_common.py @@ -1,9 +1,9 @@ # Run a selection of common tests -from setup.move import * -from setup.sinqMotor.limits import * -from setup.home import * -from setup.sinqMotor.masterMacs.reset import reset +from src.move import * +from src.sinqMotor.limits import * +from src.home import * +from src.sinqMotor.masterMacs.reset import reset def test_reset(motor): diff --git a/tests/turboPmac1/ax1/conftest.py b/tests/turboPmac1/ax1/conftest.py index 0e68945..2845b1d 100755 --- a/tests/turboPmac1/ax1/conftest.py +++ b/tests/turboPmac1/ax1/conftest.py @@ -2,20 +2,23 @@ import time import pytest -from setup.classes import TurboPMAC +from src.classes import TurboPMAC # Prepare the motor at the start of the test by resetting and homing it. + + @pytest.fixture(scope="session", autouse=True) def reset_and_home(): mot = TurboPMAC('turboPmac1', 'ax1') - mot.put_pv('stop', 1) - mot.put_pv('reseterrorpv', 1) + mot.stop() + mot.reset() mot.wait_disabled() mot.enable_and_wait() - mot.put_pv('homeforward', 1) + mot.homeforward() time.sleep(1) mot.wait_for_done() + @pytest.fixture(autouse=True) def motor(): return TurboPMAC('turboPmac1', 'ax1') diff --git a/tests/turboPmac1/ax1/test_common.py b/tests/turboPmac1/ax1/test_common.py index 59fc8aa..f33bef2 100755 --- a/tests/turboPmac1/ax1/test_common.py +++ b/tests/turboPmac1/ax1/test_common.py @@ -2,22 +2,24 @@ import pytest -from setup.move import * -from setup.home import * -from setup.sinqMotor.limits import * -from setup.sinqMotor.speed import * -from setup.sinqMotor.turboPmac.reset import reset +from src.move import * +from src.home import * +from src.sinqMotor.limits import * +from src.sinqMotor.speed import * +from src.sinqMotor.turboPmac.reset import reset def test_reset(motor): reset(motor, 5) reset(motor, 2) + @pytest.mark.stresstest def test_reset_stress(motor): for _ in range(50): test_reset(motor) + def test_move_to_low_limit_switch(motor): reset(motor) move_to_low_limit_switch(motor) @@ -47,6 +49,7 @@ def test_stop_then_move(motor): reset(motor) stop_then_move(motor, 20) + @pytest.mark.stresstest def test_stop_then_move_stress(motor): reset(motor) @@ -107,7 +110,7 @@ def test_start_home_while_moving(motor): def test_move_then_home(motor): reset(motor, 1) move_then_home(motor, 30, True) - + def test_home_then_move(motor): reset(motor, 1) diff --git a/tests/turboPmac1/ax5/conftest.py b/tests/turboPmac1/ax5/conftest.py index 0fe3b83..215c260 100755 --- a/tests/turboPmac1/ax5/conftest.py +++ b/tests/turboPmac1/ax5/conftest.py @@ -1,7 +1,7 @@ # This module defines fixtures which are shared for all tests of motor "rot1". import pytest -from setup.classes import TurboPMAC +from src.classes import TurboPMAC @pytest.fixture(autouse=True) diff --git a/tests/turboPmac1/ax5/test_common.py b/tests/turboPmac1/ax5/test_common.py index c0d01e8..f278a44 100755 --- a/tests/turboPmac1/ax5/test_common.py +++ b/tests/turboPmac1/ax5/test_common.py @@ -1,22 +1,24 @@ # Run a selection of common tests import pytest -from setup.move import * -from setup.sinqMotor.limits import * -from setup.sinqMotor.speed import * -from setup.home import * -from setup.sinqMotor.turboPmac.reset import reset +from src.move import * +from src.sinqMotor.limits import * +from src.sinqMotor.speed import * +from src.home import * +from src.sinqMotor.turboPmac.reset import reset def test_reset(motor): reset(motor, 5) reset(motor) + @pytest.mark.stresstest def test_reset_stress(motor): for _ in range(50): test_reset(motor) + def test_move_to_low_limit_switch(motor): reset(motor) move_to_low_limit_switch(motor) @@ -46,6 +48,7 @@ def test_stop_then_move(motor): reset(motor) stop_then_move(motor, 100) + @pytest.mark.stresstest def test_stop_then_move_stress(motor): reset(motor) @@ -74,13 +77,15 @@ def test_set_speed_and_move(motor): set_speed_and_move(motor, motor.get_pv('maxspeed'), 10) set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) + @pytest.mark.stresstest def test_set_speed_and_move_stress(motor): reset(motor) for _ in range(50): set_speed_and_move(motor, motor.get_pv('maxspeed'), 10) set_speed_and_move(motor, 0.5*motor.get_pv('maxspeed'), 0) - + + def test_targets_outside_limits(motor): reset(motor) targets_outside_limits(motor)