281 lines
11 KiB
Python
281 lines
11 KiB
Python
from collections import namedtuple
|
|
from enum import Enum
|
|
|
|
from pyscan import config
|
|
|
|
EPICS_PV = namedtuple("EPICS_PV", ["identifier", "pv_name", "readback_pv_name", "tolerance", "readback_pv_value"])
|
|
EPICS_CONDITION = namedtuple("EPICS_CONDITION", ["identifier", "pv_name", "value", "action", "tolerance", "operation"])
|
|
BS_PROPERTY = namedtuple("BS_PROPERTY", ["identifier", "property", "default_value"])
|
|
BS_CONDITION = namedtuple("BS_CONDITION", ["identifier", "property", "value", "action", "tolerance", "operation",
|
|
"default_value"])
|
|
SCAN_SETTINGS = namedtuple("SCAN_SETTINGS", ["measurement_interval", "n_measurements",
|
|
"write_timeout", "settling_time", "progress_callback", "bs_read_filter"])
|
|
FUNCTION_VALUE = namedtuple("FUNCTION_VALUE", ["identifier", "call_function"])
|
|
FUNCTION_CONDITION = namedtuple("FUNCTION_CONDITION", ["identifier", "call_function", "action"])
|
|
|
|
|
|
class ConditionComparison(Enum):
|
|
EQUAL = 0
|
|
NOT_EQUAL = 1
|
|
LOWER = 2
|
|
LOWER_OR_EQUAL = 3
|
|
HIGHER = 4
|
|
HIGHER_OR_EQUAL = 5
|
|
|
|
|
|
class ConditionAction(Enum):
|
|
Abort = 1
|
|
Retry = 2
|
|
|
|
|
|
# Used to determine if a parameter was passed or the default value is used.
|
|
_default_value_placeholder = object()
|
|
|
|
|
|
def function_value(call_function, name=None):
|
|
"""
|
|
Construct a tuple for function representation.
|
|
:param call_function: Function to invoke.
|
|
:param name: Name to assign to this function.
|
|
:return: Tuple of ("identifier", "call_function")
|
|
"""
|
|
# If the name is not specified, use a counter to set the function name.
|
|
if not name:
|
|
name = "function_%d" % function_value.function_count
|
|
function_value.function_count += 1
|
|
identifier = name
|
|
|
|
return FUNCTION_VALUE(identifier, call_function)
|
|
function_value.function_count = 0
|
|
|
|
|
|
def function_condition(call_function, name=None, action=None):
|
|
"""
|
|
Construct a tuple for condition checking function representation.
|
|
:param call_function: Function to invoke.
|
|
:param name: Name to assign to this function.
|
|
:param action: What to do then the return value is False.
|
|
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
|
|
:return: Tuple of ("identifier", "call_function", "action")
|
|
"""
|
|
# If the name is not specified, use a counter to set the function name.
|
|
if not name:
|
|
name = "function_condition_%d" % function_condition.function_count
|
|
function_condition.function_count += 1
|
|
identifier = name
|
|
|
|
# The default action is Abort - used for conditions.
|
|
if not action:
|
|
action = ConditionAction.Abort
|
|
|
|
return FUNCTION_CONDITION(identifier, call_function, action)
|
|
function_condition.function_count = 0
|
|
|
|
|
|
def epics_pv(pv_name, readback_pv_name=None, tolerance=None, readback_pv_value=None):
|
|
"""
|
|
Construct a tuple for PV representation
|
|
:param pv_name: Name of the PV.
|
|
:param readback_pv_name: Name of the readback PV.
|
|
:param tolerance: Tolerance if the PV is writable.
|
|
:param readback_pv_value: If the readback_pv_value is set, the readback is compared against this instead of
|
|
comparing it to the setpoint.
|
|
:return: Tuple of (identifier, pv_name, pv_readback, tolerance)
|
|
"""
|
|
identifier = pv_name
|
|
|
|
if not pv_name:
|
|
raise ValueError("pv_name not specified.")
|
|
|
|
if not readback_pv_name:
|
|
readback_pv_name = pv_name
|
|
|
|
if not tolerance or tolerance < config.max_float_tolerance:
|
|
tolerance = config.max_float_tolerance
|
|
|
|
return EPICS_PV(identifier, pv_name, readback_pv_name, tolerance, readback_pv_value)
|
|
|
|
|
|
def epics_condition(pv_name, value, action=None, tolerance=None, operation=ConditionComparison.EQUAL):
|
|
"""
|
|
Construct a tuple for an epics condition representation.
|
|
:param pv_name: Name of the PV to monitor.
|
|
:param value: Value we expect the PV to be in.
|
|
:param action: What to do when the condition fails.
|
|
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
|
|
:param tolerance: Tolerance within which the condition needs to be.
|
|
:param operation: How to compare the received value with the expected value.
|
|
Allowed values: ConditionComparison.[EQUAL,NOT_EQUAL, LOWER, LOWER_OR_EQUAL, HIGHER, HIGHER_OR_EQUAL]
|
|
:return: Tuple of ("pv_name", "value", "action", "tolerance", "timeout", "operation")
|
|
"""
|
|
identifier = pv_name
|
|
|
|
if not pv_name:
|
|
raise ValueError("pv_name not specified.")
|
|
|
|
if value is None:
|
|
raise ValueError("pv value not specified.")
|
|
|
|
# the default action is Abort.
|
|
if not action:
|
|
action = ConditionAction.Abort
|
|
|
|
if not tolerance or tolerance < config.max_float_tolerance:
|
|
tolerance = config.max_float_tolerance
|
|
|
|
return EPICS_CONDITION(identifier, pv_name, value, action, tolerance, operation)
|
|
|
|
|
|
def bs_property(name, default_value=_default_value_placeholder):
|
|
"""
|
|
Construct a tuple for bs read property representation.
|
|
:param name: Complete property name.
|
|
:param default_value: The default value that is assigned to the property if it is missing.
|
|
:return: Tuple of ("identifier", "property", "default_value")
|
|
"""
|
|
identifier = name
|
|
|
|
if not name:
|
|
raise ValueError("name not specified.")
|
|
|
|
# We need this to allow the user to change the config at runtime.
|
|
if default_value is _default_value_placeholder:
|
|
default_value = config.bs_default_missing_property_value
|
|
|
|
return BS_PROPERTY(identifier, name, default_value)
|
|
|
|
|
|
def bs_condition(name, value, action=None, tolerance=None, operation=ConditionComparison.EQUAL,
|
|
default_value=_default_value_placeholder):
|
|
"""
|
|
Construct a tuple for bs condition property representation.
|
|
:param name: Complete property name.
|
|
:param value: Expected value.
|
|
:param action: What to do when the condition fails.
|
|
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
|
|
:param tolerance: Tolerance within which the condition needs to be.
|
|
:param operation: How to compare the received value with the expected value.
|
|
Allowed values: ConditionComparison.[EQUAL,NOT_EQUAL, LOWER, LOWER_OR_EQUAL, HIGHER, HIGHER_OR_EQUAL]
|
|
:param default_value: Default value of a condition, if not present in the bs stream.
|
|
:return: Tuple of ("identifier", "property", "value", "action", "tolerance", "operation", "default_value")
|
|
"""
|
|
identifier = name
|
|
|
|
if not name:
|
|
raise ValueError("name not specified.")
|
|
|
|
if value is None:
|
|
raise ValueError("value not specified.")
|
|
|
|
if not tolerance or tolerance < config.max_float_tolerance:
|
|
tolerance = config.max_float_tolerance
|
|
|
|
if not action:
|
|
action = ConditionAction.Abort
|
|
|
|
# We need this to allow the user to change the config at runtime.
|
|
if default_value is _default_value_placeholder:
|
|
default_value = config.bs_default_missing_property_value
|
|
|
|
return BS_CONDITION(identifier, name, value, action, tolerance, operation, default_value)
|
|
|
|
|
|
def scan_settings(measurement_interval=None, n_measurements=None, write_timeout=None, settling_time=None,
|
|
progress_callback=None, bs_read_filter=None):
|
|
"""
|
|
Set the scan settings.
|
|
:param measurement_interval: Default 0. Interval between each measurement, in case n_measurements is more than 1.
|
|
:param n_measurements: Default 1. How many measurements to make at each position.
|
|
:param write_timeout: How much time to wait in seconds for set_and_match operations on epics PVs.
|
|
:param settling_time: How much time to wait in seconds after the motors have reached the desired destination.
|
|
:param progress_callback: Function to call after each scan step is completed.
|
|
Signature: def callback(current_position, total_positions)
|
|
:param bs_read_filter: Filter to apply to the bs read receive function, to filter incoming messages.
|
|
Signature: def callback(message)
|
|
:return: Scan settings named tuple.
|
|
"""
|
|
if not measurement_interval or measurement_interval < 0:
|
|
measurement_interval = config.scan_default_measurement_interval
|
|
|
|
if not n_measurements or n_measurements < 1:
|
|
n_measurements = config.scan_default_n_measurements
|
|
|
|
if not write_timeout or write_timeout < 0:
|
|
write_timeout = config.epics_default_set_and_match_timeout
|
|
|
|
if not settling_time or settling_time < 0:
|
|
settling_time = config.epics_default_settling_time
|
|
|
|
if not progress_callback:
|
|
def default_progress_callback(current_position, total_positions):
|
|
completed_percentage = 100.0 * (current_position / total_positions)
|
|
print("Scan: %.2f %% completed (%d/%d)" % (completed_percentage, current_position, total_positions))
|
|
|
|
progress_callback = default_progress_callback
|
|
|
|
return SCAN_SETTINGS(measurement_interval, n_measurements, write_timeout, settling_time, progress_callback,
|
|
bs_read_filter)
|
|
|
|
|
|
def convert_input(input_parameters):
|
|
"""
|
|
Convert any type of input parameter into appropriate named tuples.
|
|
:param input_parameters: Parameter input from the user.
|
|
:return: Inputs converted into named tuples.
|
|
"""
|
|
converted_inputs = []
|
|
for input in input_parameters:
|
|
# Input already of correct type.
|
|
if isinstance(input, (EPICS_PV, BS_PROPERTY, FUNCTION_VALUE)):
|
|
converted_inputs.append(input)
|
|
# We need to convert it.
|
|
elif isinstance(input, str):
|
|
# Check if the string is valid.
|
|
if not input:
|
|
raise ValueError("Input cannot be an empty string.")
|
|
|
|
if "://" in input:
|
|
# Epics PV!
|
|
if input.lower().startswith("ca://"):
|
|
converted_inputs.append(epics_pv(input[5:]))
|
|
# bs_read property.
|
|
elif input.lower().startswith("bs://"):
|
|
converted_inputs.append(bs_property(input[5:]))
|
|
# A new protocol we don't know about?
|
|
else:
|
|
raise ValueError("Readable %s uses an unexpected protocol. "
|
|
"'ca://' and 'bs://' are supported." % input)
|
|
# No protocol specified, default is epics.
|
|
else:
|
|
converted_inputs.append(epics_pv(input))
|
|
|
|
elif callable(input):
|
|
converted_inputs.append(function_value(input))
|
|
# Supported named tuples or string, we cannot interpret the rest.
|
|
else:
|
|
raise ValueError("Input of unexpected type %s. Value: '%s'." % (type(input), input))
|
|
|
|
return converted_inputs
|
|
|
|
|
|
def convert_conditions(input_conditions):
|
|
"""
|
|
Convert any type type of condition input parameter into appropriate named tuples.
|
|
:param input_conditions: Condition input from the used.
|
|
:return: Input conditions converted into named tuples.
|
|
"""
|
|
|
|
converted_inputs = []
|
|
for input in input_conditions:
|
|
# Input already of correct type.
|
|
if isinstance(input, (EPICS_CONDITION, BS_CONDITION, FUNCTION_CONDITION)):
|
|
converted_inputs.append(input)
|
|
# Function call.
|
|
elif callable(input):
|
|
converted_inputs.append(function_condition(input))
|
|
# Unknown.
|
|
else:
|
|
raise ValueError("Condition of unexpected type %s. Value: '%s'." % (type(input), input))
|
|
|
|
return converted_inputs
|