Files
common/packages/pyscan/dal/epics_dal.py
2025-01-07 12:38:15 +01:00

209 lines
7.6 KiB
Python

import time
from itertools import count
from pyscan import config
from pyscan.utils import convert_to_list, validate_lists_length, connect_to_pv, compare_channel_value
class PyEpicsDal(object):
"""
Provide a high level abstraction over PyEpics with group support.
"""
def __init__(self):
self.groups = {}
self.pvs = {}
def add_group(self, group_name, group_interface):
# Do not allow to overwrite the group.
if group_name in self.groups:
raise ValueError("Group with name %s already exists. "
"Use different name of close existing group first." % group_name)
self.groups[group_name] = group_interface
return group_name
def add_reader_group(self, group_name, pv_names):
self.add_group(group_name, ReadGroupInterface(pv_names))
def add_writer_group(self, group_name, pv_names, readback_pv_names=None, tolerances=None, timeout=None):
self.add_group(group_name, WriteGroupInterface(pv_names, readback_pv_names, tolerances, timeout))
def get_group(self, handle):
return self.groups.get(handle)
def close_group(self, group_name):
if group_name not in self.groups:
raise ValueError("Group does not exist. Available groups:\n%s" % self.groups.keys())
# Close the PV connection.
self.groups[group_name].close()
del self.groups[group_name]
def close_all_groups(self):
for group in self.groups.values():
group.close()
self.groups.clear()
class WriteGroupInterface(object):
"""
Manage a group of Write PVs.
"""
default_timeout = 5
default_get_sleep = 0.1
def __init__(self, pv_names, readback_pv_names=None, tolerances=None, timeout=None):
"""
Initialize the write group.
:param pv_names: PV names (or name, list or single string) to connect to.
:param readback_pv_names: PV names (or name, list or single string) of readback PVs to connect to.
:param tolerances: Tolerances to be used for set_and_match. You can also specify them on the set_and_match
:param timeout: Timeout to reach the destination.
"""
self.pv_names = convert_to_list(pv_names)
self.pvs = [self.connect(pv_name) for pv_name in self.pv_names]
if readback_pv_names:
self.readback_pv_name = convert_to_list(readback_pv_names)
self.readback_pvs = [self.connect(pv_name) for pv_name in self.readback_pv_name]
else:
self.readback_pv_name = self.pv_names
self.readback_pvs = self.pvs
self.tolerances = self._setup_tolerances(tolerances)
# We also do not allow timeout to be zero.
self.timeout = timeout or self.default_timeout
# Verify if all provided lists are of same size.
validate_lists_length(self.pvs, self.readback_pvs, self.tolerances)
# Check if timeout is int or float.
if not isinstance(self.timeout, (int, float)):
raise ValueError("Timeout must be int or float, but %s was provided." % self.timeout)
def _setup_tolerances(self, tolerances):
"""
Construct the list of tolerances. No tolerance can be less then the minimal tolerance.
:param tolerances: Input tolerances.
:return: Tolerances adjusted to the minimum value, if needed.
"""
# If the provided tolerances are empty, substitute them with a list of default tolerances.
tolerances = convert_to_list(tolerances) or [config.max_float_tolerance] * len(self.pvs)
# Each tolerance needs to be at least the size of the minimum tolerance.
tolerances = [max(config.max_float_tolerance, tolerance) for tolerance in tolerances]
return tolerances
def set_and_match(self, values, tolerances=None, timeout=None):
"""
Set the value and wait for the PV to reach it, within tollerance.
:param values: Values to set (Must match the number of PVs in this group)
:param tolerances: Tolerances for each PV (Must match the number of PVs in this group)
:param timeout: Timeout, single value, to wait until the value is reached.
:raise ValueError if any position cannot be reached.
"""
values = convert_to_list(values)
if not tolerances:
tolerances = self.tolerances
else:
# We do not allow tolerances to be less than the default tolerance.
tolerances = self._setup_tolerances(tolerances)
if not timeout:
timeout = self.timeout
# Verify if all provided lists are of same size.
validate_lists_length(self.pvs, values, tolerances)
# Check if timeout is int or float.
if not isinstance(timeout, (int, float)):
raise ValueError("Timeout must be int or float, but %s was provided." % timeout)
# Write all the PV values.
for pv, value in zip(self.pvs, values):
pv.put(value)
# Boolean array to represent which PVs have reached their target value.s
within_tolerance = [False] * len(self.pvs)
initial_timestamp = time.time()
# Read values until all PVs have reached the desired value or time has run out.
while (not all(within_tolerance)) and (time.time() - initial_timestamp < timeout):
# Get only the PVs that have not yet reached the final position.
for index, pv, tolerance in ((index, pv, tolerance) for index, pv, tolerance, values_reached
in zip(count(), self.readback_pvs, tolerances, within_tolerance)
if not values_reached):
current_value = pv.get()
expected_value = values[index]
if compare_channel_value(current_value, expected_value, tolerance):
within_tolerance[index] = True
time.sleep(self.default_get_sleep)
if not all(within_tolerance):
error_message = ""
# Get the indexes that did not reach the supposed values.
for index in [index for index, reached_value in enumerate(within_tolerance) if not reached_value]:
expected_value = values[index]
pv_name = self.pv_names[index]
tolerance = tolerances[index]
error_message += "Cannot achieve value %s, on PV %s, with tolerance %s.\n" % \
(expected_value, pv_name, tolerance)
raise ValueError(error_message)
@staticmethod
def connect(pv_name):
return connect_to_pv(pv_name)
def close(self):
"""
Close all PV connections.
"""
for pv in self.pvs:
pv.disconnect()
class ReadGroupInterface(object):
"""
Manage group of read PVs.
"""
def __init__(self, pv_names):
"""
Initialize the group.
:param pv_names: PV names (or name, list or single string) to connect to.
"""
self.pv_names = convert_to_list(pv_names)
self.pvs = [self.connect(pv_name) for pv_name in self.pv_names]
def read(self, current_position_index=None, retry=None):
"""
Read PVs one by one.
:param current_position_index: Index of the current scan.
:param retry: Is this the first read attempt or a retry.
:return: Result
"""
result = []
for pv in self.pvs:
result.append(pv.get())
return result
@staticmethod
def connect(pv_name):
return connect_to_pv(pv_name)
def close(self):
"""
Close all PV connections.
"""
for pv in self.pvs:
pv.disconnect()