Files
common/packages/pyscan/utils.py
2025-01-07 12:38:15 +01:00

217 lines
7.3 KiB
Python

import inspect
from collections import OrderedDict
from time import sleep
from epics.pv import PV
from pyscan import config
from pyscan.scan_parameters import convert_input, ConditionComparison
def compare_channel_value(current_value, expected_value, tolerance=0.0, operation=ConditionComparison.EQUAL):
"""
Check if the pv value is the same as the expected value, within tolerance for int and float.
:param current_value: Current value to compare it to.
:param expected_value: Expected value of the PV.
:param tolerance: Tolerance for number comparison. Cannot be less than the minimum tolerance.
:param operation: Operation to perform on the current and expected value - works for int and floats.
:return: True if the value matches.
"""
# Minimum tolerance allowed.
tolerance = max(tolerance, config.max_float_tolerance)
def compare_value(value):
# For numbers we compare them within tolerance.
if isinstance(current_value, (float, int)):
if operation == ConditionComparison.EQUAL:
return abs(current_value - expected_value) <= tolerance
elif operation == ConditionComparison.HIGHER:
return (current_value - expected_value) > tolerance
elif operation == ConditionComparison.HIGHER_OR_EQUAL:
return (current_value - expected_value) >= tolerance
elif operation == ConditionComparison.LOWER:
return (current_value - expected_value) < 0 or abs(current_value - expected_value) < tolerance
elif operation == ConditionComparison.LOWER_OR_EQUAL:
return (current_value - expected_value) <= 0 or abs(current_value - expected_value) <= tolerance
elif operation == ConditionComparison.NOT_EQUAL:
return abs(current_value - expected_value) > tolerance
# Otherwise use the object comparison.
else:
try:
if operation == ConditionComparison.EQUAL:
return current_value == expected_value
elif operation == ConditionComparison.HIGHER:
return current_value > expected_value
elif operation == ConditionComparison.HIGHER_OR_EQUAL:
return current_value >= expected_value
elif operation == ConditionComparison.LOWER:
return current_value < expected_value
elif operation == ConditionComparison.LOWER_OR_EQUAL:
return current_value <= expected_value
elif operation == ConditionComparison.NOT_EQUAL:
return current_value != expected_value
except:
raise ValueError("Do not know how to compare current_value %s with expected_value %s and action %s."
% (current_value, expected_value, operation))
return False
if isinstance(current_value, list):
# In case of a list, any of the provided values will do.
return any((compare_value(value) for value in expected_value))
else:
return compare_value(current_value)
def connect_to_pv(pv_name, n_connection_attempts=3):
"""
Start a connection to a PV.
:param pv_name: PV name to connect to.
:param n_connection_attempts: How many times you should try to connect before raising an exception.
:return: PV object.
:raises ValueError if cannot connect to PV.
"""
pv = PV(pv_name, auto_monitor=False)
for i in range(n_connection_attempts):
if pv.connect():
return pv
sleep(0.1)
raise ValueError("Cannot connect to PV '%s'." % pv_name)
def validate_lists_length(*args):
"""
Check if all the provided lists are of the same length.
:param args: Lists.
:raise ValueError if they are not of the same length.
"""
if not args:
raise ValueError("Cannot compare lengths of None.")
initial_length = len(args[0])
if not all([len(element) == initial_length for element in args]):
error = "The provided lists must be of same length.\n"
for element in args:
error += "%s\n" % element
raise ValueError(error)
def convert_to_list(value):
"""
If the input parameter is not a list, convert to one.
:return: The value in a list, or None.
"""
# If None or a list, just return the value as it is.
if (value is None) or isinstance(value, list):
return value
# Otherwise treat the value as the first element in a list.
return [value]
def convert_to_position_list(axis_list):
"""
# Change the PER KNOB to PER INDEX of positions.
:param axis_list: PER KNOB list of positions.
:return: PER INDEX list of positions.
"""
return [list(positions) for positions in zip(*axis_list)]
def flat_list_generator(list_to_flatten):
# Just return the most inner list.
if (len(list_to_flatten) == 0) or (not isinstance(list_to_flatten[0], list)):
yield list_to_flatten
# Otherwise we have to go deeper.
else:
for inner_list in list_to_flatten:
yield from flat_list_generator(inner_list)
class ActionExecutor(object):
"""
Execute all callbacks in the same thread.
Each callback method should accept 2 parameters: position, sampled values.
"""
def __init__(self, actions):
"""
Initialize the action executor.
:param actions: Actions to execute. Single action or list of.
"""
self.actions = convert_to_list(actions)
def execute(self, position, position_data=None):
for action in self.actions:
n_parameters = len(inspect.signature(action).parameters)
if n_parameters == 2:
action(position, position_data)
elif n_parameters == 1:
action(position)
else:
action()
class SimpleDataProcessor(object):
"""
Save the position and the received data at this position.
"""
def __init__(self, positions=None, data=None):
"""
Initialize the simple data processor.
:param positions: List to store the visited positions. Default: internal list.
:param data: List to store the data at each position. Default: internal list.
"""
self.positions = positions if positions is not None else []
self.data = data if data is not None else []
def process(self, position, data):
self.positions.append(position)
self.data.append(data)
def get_data(self):
return self.data
def get_positions(self):
return self.positions
class DictionaryDataProcessor(SimpleDataProcessor):
"""
Save the positions and the received data for each position in a dictionary.
"""
def __init__(self, readables, positions=None, data=None):
"""
Readables specified in the scan.
:param readables: Same readables that were passed to the scan function.
"""
super(DictionaryDataProcessor, self).__init__(positions=positions, data=data)
readables = convert_input(readables)
self.readable_ids = [x.identifier for x in readables]
def process(self, position, data):
self.positions.append(position)
# Create a dictionary with the results.
values = OrderedDict(zip(self.readable_ids, data))
self.data.append(values)