Files
common/packages/pyscan/scan_parameters.py
2025-01-07 12:38:15 +01:00

281 lines
11 KiB
Python

from collections import namedtuple
from enum import Enum
from pyscan import config
EPICS_PV = namedtuple("EPICS_PV", ["identifier", "pv_name", "readback_pv_name", "tolerance", "readback_pv_value"])
EPICS_CONDITION = namedtuple("EPICS_CONDITION", ["identifier", "pv_name", "value", "action", "tolerance", "operation"])
BS_PROPERTY = namedtuple("BS_PROPERTY", ["identifier", "property", "default_value"])
BS_CONDITION = namedtuple("BS_CONDITION", ["identifier", "property", "value", "action", "tolerance", "operation",
"default_value"])
SCAN_SETTINGS = namedtuple("SCAN_SETTINGS", ["measurement_interval", "n_measurements",
"write_timeout", "settling_time", "progress_callback", "bs_read_filter"])
FUNCTION_VALUE = namedtuple("FUNCTION_VALUE", ["identifier", "call_function"])
FUNCTION_CONDITION = namedtuple("FUNCTION_CONDITION", ["identifier", "call_function", "action"])
class ConditionComparison(Enum):
EQUAL = 0
NOT_EQUAL = 1
LOWER = 2
LOWER_OR_EQUAL = 3
HIGHER = 4
HIGHER_OR_EQUAL = 5
class ConditionAction(Enum):
Abort = 1
Retry = 2
# Used to determine if a parameter was passed or the default value is used.
_default_value_placeholder = object()
def function_value(call_function, name=None):
"""
Construct a tuple for function representation.
:param call_function: Function to invoke.
:param name: Name to assign to this function.
:return: Tuple of ("identifier", "call_function")
"""
# If the name is not specified, use a counter to set the function name.
if not name:
name = "function_%d" % function_value.function_count
function_value.function_count += 1
identifier = name
return FUNCTION_VALUE(identifier, call_function)
function_value.function_count = 0
def function_condition(call_function, name=None, action=None):
"""
Construct a tuple for condition checking function representation.
:param call_function: Function to invoke.
:param name: Name to assign to this function.
:param action: What to do then the return value is False.
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
:return: Tuple of ("identifier", "call_function", "action")
"""
# If the name is not specified, use a counter to set the function name.
if not name:
name = "function_condition_%d" % function_condition.function_count
function_condition.function_count += 1
identifier = name
# The default action is Abort - used for conditions.
if not action:
action = ConditionAction.Abort
return FUNCTION_CONDITION(identifier, call_function, action)
function_condition.function_count = 0
def epics_pv(pv_name, readback_pv_name=None, tolerance=None, readback_pv_value=None):
"""
Construct a tuple for PV representation
:param pv_name: Name of the PV.
:param readback_pv_name: Name of the readback PV.
:param tolerance: Tolerance if the PV is writable.
:param readback_pv_value: If the readback_pv_value is set, the readback is compared against this instead of
comparing it to the setpoint.
:return: Tuple of (identifier, pv_name, pv_readback, tolerance)
"""
identifier = pv_name
if not pv_name:
raise ValueError("pv_name not specified.")
if not readback_pv_name:
readback_pv_name = pv_name
if not tolerance or tolerance < config.max_float_tolerance:
tolerance = config.max_float_tolerance
return EPICS_PV(identifier, pv_name, readback_pv_name, tolerance, readback_pv_value)
def epics_condition(pv_name, value, action=None, tolerance=None, operation=ConditionComparison.EQUAL):
"""
Construct a tuple for an epics condition representation.
:param pv_name: Name of the PV to monitor.
:param value: Value we expect the PV to be in.
:param action: What to do when the condition fails.
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
:param tolerance: Tolerance within which the condition needs to be.
:param operation: How to compare the received value with the expected value.
Allowed values: ConditionComparison.[EQUAL,NOT_EQUAL, LOWER, LOWER_OR_EQUAL, HIGHER, HIGHER_OR_EQUAL]
:return: Tuple of ("pv_name", "value", "action", "tolerance", "timeout", "operation")
"""
identifier = pv_name
if not pv_name:
raise ValueError("pv_name not specified.")
if value is None:
raise ValueError("pv value not specified.")
# the default action is Abort.
if not action:
action = ConditionAction.Abort
if not tolerance or tolerance < config.max_float_tolerance:
tolerance = config.max_float_tolerance
return EPICS_CONDITION(identifier, pv_name, value, action, tolerance, operation)
def bs_property(name, default_value=_default_value_placeholder):
"""
Construct a tuple for bs read property representation.
:param name: Complete property name.
:param default_value: The default value that is assigned to the property if it is missing.
:return: Tuple of ("identifier", "property", "default_value")
"""
identifier = name
if not name:
raise ValueError("name not specified.")
# We need this to allow the user to change the config at runtime.
if default_value is _default_value_placeholder:
default_value = config.bs_default_missing_property_value
return BS_PROPERTY(identifier, name, default_value)
def bs_condition(name, value, action=None, tolerance=None, operation=ConditionComparison.EQUAL,
default_value=_default_value_placeholder):
"""
Construct a tuple for bs condition property representation.
:param name: Complete property name.
:param value: Expected value.
:param action: What to do when the condition fails.
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
:param tolerance: Tolerance within which the condition needs to be.
:param operation: How to compare the received value with the expected value.
Allowed values: ConditionComparison.[EQUAL,NOT_EQUAL, LOWER, LOWER_OR_EQUAL, HIGHER, HIGHER_OR_EQUAL]
:param default_value: Default value of a condition, if not present in the bs stream.
:return: Tuple of ("identifier", "property", "value", "action", "tolerance", "operation", "default_value")
"""
identifier = name
if not name:
raise ValueError("name not specified.")
if value is None:
raise ValueError("value not specified.")
if not tolerance or tolerance < config.max_float_tolerance:
tolerance = config.max_float_tolerance
if not action:
action = ConditionAction.Abort
# We need this to allow the user to change the config at runtime.
if default_value is _default_value_placeholder:
default_value = config.bs_default_missing_property_value
return BS_CONDITION(identifier, name, value, action, tolerance, operation, default_value)
def scan_settings(measurement_interval=None, n_measurements=None, write_timeout=None, settling_time=None,
progress_callback=None, bs_read_filter=None):
"""
Set the scan settings.
:param measurement_interval: Default 0. Interval between each measurement, in case n_measurements is more than 1.
:param n_measurements: Default 1. How many measurements to make at each position.
:param write_timeout: How much time to wait in seconds for set_and_match operations on epics PVs.
:param settling_time: How much time to wait in seconds after the motors have reached the desired destination.
:param progress_callback: Function to call after each scan step is completed.
Signature: def callback(current_position, total_positions)
:param bs_read_filter: Filter to apply to the bs read receive function, to filter incoming messages.
Signature: def callback(message)
:return: Scan settings named tuple.
"""
if not measurement_interval or measurement_interval < 0:
measurement_interval = config.scan_default_measurement_interval
if not n_measurements or n_measurements < 1:
n_measurements = config.scan_default_n_measurements
if not write_timeout or write_timeout < 0:
write_timeout = config.epics_default_set_and_match_timeout
if not settling_time or settling_time < 0:
settling_time = config.epics_default_settling_time
if not progress_callback:
def default_progress_callback(current_position, total_positions):
completed_percentage = 100.0 * (current_position / total_positions)
print("Scan: %.2f %% completed (%d/%d)" % (completed_percentage, current_position, total_positions))
progress_callback = default_progress_callback
return SCAN_SETTINGS(measurement_interval, n_measurements, write_timeout, settling_time, progress_callback,
bs_read_filter)
def convert_input(input_parameters):
"""
Convert any type of input parameter into appropriate named tuples.
:param input_parameters: Parameter input from the user.
:return: Inputs converted into named tuples.
"""
converted_inputs = []
for input in input_parameters:
# Input already of correct type.
if isinstance(input, (EPICS_PV, BS_PROPERTY, FUNCTION_VALUE)):
converted_inputs.append(input)
# We need to convert it.
elif isinstance(input, str):
# Check if the string is valid.
if not input:
raise ValueError("Input cannot be an empty string.")
if "://" in input:
# Epics PV!
if input.lower().startswith("ca://"):
converted_inputs.append(epics_pv(input[5:]))
# bs_read property.
elif input.lower().startswith("bs://"):
converted_inputs.append(bs_property(input[5:]))
# A new protocol we don't know about?
else:
raise ValueError("Readable %s uses an unexpected protocol. "
"'ca://' and 'bs://' are supported." % input)
# No protocol specified, default is epics.
else:
converted_inputs.append(epics_pv(input))
elif callable(input):
converted_inputs.append(function_value(input))
# Supported named tuples or string, we cannot interpret the rest.
else:
raise ValueError("Input of unexpected type %s. Value: '%s'." % (type(input), input))
return converted_inputs
def convert_conditions(input_conditions):
"""
Convert any type type of condition input parameter into appropriate named tuples.
:param input_conditions: Condition input from the used.
:return: Input conditions converted into named tuples.
"""
converted_inputs = []
for input in input_conditions:
# Input already of correct type.
if isinstance(input, (EPICS_CONDITION, BS_CONDITION, FUNCTION_CONDITION)):
converted_inputs.append(input)
# Function call.
elif callable(input):
converted_inputs.append(function_condition(input))
# Unknown.
else:
raise ValueError("Condition of unexpected type %s. Value: '%s'." % (type(input), input))
return converted_inputs