Files
common/packages/pyscan/scan.py
2025-01-07 12:38:15 +01:00

261 lines
12 KiB
Python

import logging
from pyscan.dal import epics_dal, bsread_dal, function_dal
from pyscan.dal.function_dal import FunctionProxy
from pyscan.positioner.bsread import BsreadPositioner
from pyscan.scanner import Scanner
from pyscan.scan_parameters import EPICS_PV, EPICS_CONDITION, BS_PROPERTY, BS_CONDITION, scan_settings, convert_input, \
FUNCTION_VALUE, FUNCTION_CONDITION, convert_conditions, ConditionAction, ConditionComparison
from pyscan.utils import convert_to_list, SimpleDataProcessor, ActionExecutor, compare_channel_value
# Instances to use.
EPICS_WRITER = epics_dal.WriteGroupInterface
EPICS_READER = epics_dal.ReadGroupInterface
BS_READER = bsread_dal.ReadGroupInterface
FUNCTION_PROXY = function_dal.FunctionProxy
DATA_PROCESSOR = SimpleDataProcessor
ACTION_EXECUTOR = ActionExecutor
_logger = logging.getLogger(__name__)
def scan(positioner, readables, writables=None, conditions=None, before_read=None, after_read=None, initialization=None,
finalization=None, settings=None, data_processor=None, before_move=None, after_move=None):
# Initialize the scanner instance.
scanner_instance = scanner(positioner, readables, writables, conditions, before_read, after_read, initialization,
finalization, settings, data_processor, before_move, after_move)
return scanner_instance.discrete_scan()
def scanner(positioner, readables, writables=None, conditions=None, before_read=None, after_read=None,
initialization=None, finalization=None, settings=None, data_processor=None,
before_move=None, after_move=None):
# Allow a list or a single value to be passed. Initialize None values.
writables = convert_input(convert_to_list(writables) or [])
readables = convert_input(convert_to_list(readables) or [])
conditions = convert_conditions(convert_to_list(conditions) or [])
before_read = convert_to_list(before_read) or []
after_read = convert_to_list(after_read) or []
before_move = convert_to_list(before_move) or []
after_move = convert_to_list(after_move) or []
initialization = convert_to_list(initialization) or []
finalization = convert_to_list(finalization) or []
settings = settings or scan_settings()
# TODO: Ugly. The scanner should not depend on a particular positioner implementation.
if isinstance(positioner, BsreadPositioner) and settings.n_measurements > 1:
raise ValueError("When using BsreadPositioner the maximum number of n_measurements = 1.")
bs_reader = _initialize_bs_dal(readables, conditions, settings.bs_read_filter, positioner)
epics_writer, epics_pv_reader, epics_condition_reader = _initialize_epics_dal(writables,
readables,
conditions,
settings)
function_writer, function_reader, function_condition = _initialize_function_dal(writables,
readables,
conditions)
writables_order = [type(writable) for writable in writables]
# Write function needs to merge PV and function proxy data.
def write_data(positions):
positions = convert_to_list(positions)
pv_values = [x for x, source in zip(positions, writables_order) if source == EPICS_PV]
function_values = [x for x, source in zip(positions, writables_order) if source == FUNCTION_VALUE]
if epics_writer:
epics_writer.set_and_match(pv_values)
if function_writer:
function_writer.write(function_values)
# Order of value sources, needed to reconstruct the correct order of the result.
readables_order = [type(readable) for readable in readables]
# Read function needs to merge BS, PV, and function proxy data.
def read_data(current_position_index, retry=False):
_logger.debug("Reading data for position index %s." % current_position_index)
bs_values = iter(bs_reader.read(current_position_index, retry) if bs_reader else [])
epics_values = iter(epics_pv_reader.read(current_position_index) if epics_pv_reader else [])
function_values = iter(function_reader.read(current_position_index) if function_reader else [])
# Interleave the values correctly.
result = []
for source in readables_order:
if source == BS_PROPERTY:
next_result = next(bs_values)
elif source == EPICS_PV:
next_result = next(epics_values)
elif source == FUNCTION_VALUE:
next_result = next(function_values)
else:
raise ValueError("Unknown type of readable %s used." % source)
# We flatten the result, whenever possible.
if isinstance(next_result, list) and source != FUNCTION_VALUE:
result.extend(next_result)
else:
result.append(next_result)
return result
# Order of value sources, needed to reconstruct the correct order of the result.
conditions_order = [type(condition) for condition in conditions]
# Validate function needs to validate both BS, PV, and function proxy data.
def validate_data(current_position_index, data):
_logger.debug("Reading data for position index %s." % current_position_index)
bs_values = iter(bs_reader.read_cached_conditions() if bs_reader else [])
epics_values = iter(epics_condition_reader.read(current_position_index) if epics_condition_reader else [])
function_values = iter(function_condition.read(current_position_index) if function_condition else [])
for index, source in enumerate(conditions_order):
if source == BS_CONDITION:
value = next(bs_values)
elif source == EPICS_CONDITION:
value = next(epics_values)
elif source == FUNCTION_CONDITION:
value = next(function_values)
else:
raise ValueError("Unknown type of condition %s used." % source)
value_valid = False
# Function conditions are self contained.
if source == FUNCTION_CONDITION:
if value:
value_valid = True
else:
expected_value = conditions[index].value
tolerance = conditions[index].tolerance
operation = conditions[index].operation
if compare_channel_value(value, expected_value, tolerance, operation):
value_valid = True
if not value_valid:
if conditions[index].action == ConditionAction.Retry:
return False
if source == FUNCTION_CONDITION:
raise ValueError("Function condition %s returned False." % conditions[index].identifier)
else:
raise ValueError("Condition %s failed, expected value %s, actual value %s, "
"tolerance %s, operation %s." %
(conditions[index].identifier,
conditions[index].value,
value,
conditions[index].tolerance,
conditions[index].operation))
return True
if not data_processor:
data_processor = DATA_PROCESSOR()
# Before acquisition hook.
before_measurement_executor = None
if before_read:
before_measurement_executor = ACTION_EXECUTOR(before_read).execute
# After acquisition hook.
after_measurement_executor = None
if after_read:
after_measurement_executor = ACTION_EXECUTOR(after_read).execute
# Executor before each move.
before_move_executor = None
if before_move:
before_move_executor = ACTION_EXECUTOR(before_move).execute
# Executor after each move.
after_move_executor = None
if after_move:
after_move_executor = ACTION_EXECUTOR(after_move).execute
# Initialization (before move to first position) hook.
initialization_executor = None
if initialization:
initialization_executor = ACTION_EXECUTOR(initialization).execute
# Finalization (after last acquisition AND on error) hook.
finalization_executor = None
if finalization:
finalization_executor = ACTION_EXECUTOR(finalization).execute
scanner = Scanner(positioner=positioner, data_processor=data_processor, reader=read_data,
writer=write_data, before_measurement_executor=before_measurement_executor,
after_measurement_executor=after_measurement_executor,
initialization_executor=initialization_executor,
finalization_executor=finalization_executor, data_validator=validate_data, settings=settings,
before_move_executor=before_move_executor, after_move_executor=after_move_executor)
return scanner
def _initialize_epics_dal(writables, readables, conditions, settings):
epics_writer = None
if writables:
epics_writables = [x for x in writables if isinstance(x, EPICS_PV)]
if epics_writables:
# Instantiate the PVs to move the motors.
epics_writer = EPICS_WRITER(pv_names=[pv.pv_name for pv in epics_writables],
readback_pv_names=[pv.readback_pv_name for pv in epics_writables],
tolerances=[pv.tolerance for pv in epics_writables],
timeout=settings.write_timeout)
epics_readables_pv_names = [x.pv_name for x in filter(lambda x: isinstance(x, EPICS_PV), readables)]
epics_conditions_pv_names = [x.pv_name for x in filter(lambda x: isinstance(x, EPICS_CONDITION), conditions)]
# Reading epics PV values.
epics_pv_reader = None
if epics_readables_pv_names:
epics_pv_reader = EPICS_READER(pv_names=epics_readables_pv_names)
# Reading epics condition values.
epics_condition_reader = None
if epics_conditions_pv_names:
epics_condition_reader = EPICS_READER(pv_names=epics_conditions_pv_names)
return epics_writer, epics_pv_reader, epics_condition_reader
def _initialize_bs_dal(readables, conditions, filter_function, positioner):
bs_readables = [x for x in filter(lambda x: isinstance(x, BS_PROPERTY), readables)]
bs_conditions = [x for x in filter(lambda x: isinstance(x, BS_CONDITION), conditions)]
bs_reader = None
if bs_readables or bs_conditions:
# TODO: The scanner should not depend on a particular positioner. Refactor.
if isinstance(positioner, BsreadPositioner):
bs_reader = bsread_dal.ImmediateReadGroupInterface(properties=bs_readables,
conditions=bs_conditions,
filter_function=filter_function)
positioner.set_bs_reader(bs_reader)
return bs_reader
else:
bs_reader = BS_READER(properties=bs_readables, conditions=bs_conditions, filter_function=filter_function)
return bs_reader
def _initialize_function_dal(writables, readables, conditions):
function_writer = FunctionProxy([x for x in writables if isinstance(x, FUNCTION_VALUE)])
function_reader = FunctionProxy([x for x in readables if isinstance(x, FUNCTION_VALUE)])
function_condition = FunctionProxy([x for x in conditions if isinstance(x, FUNCTION_CONDITION)])
return function_writer, function_reader, function_condition