acc.qss for Injector/Cyclotron

This commit is contained in:
2025-01-07 12:38:15 +01:00
parent 6b59fe16ce
commit 96a2bfb362
37 changed files with 3521 additions and 117 deletions

View File

@@ -250,7 +250,7 @@ QGroupBox::title#MACHINE2 {
border: 2px solid #98c998;
border-radius: 3px;
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,
stop: 0 #ffffff, stop: 1#98c998);
stop: 0 #ffffff, stop: 1 #98c998);
}
QGroupBox#Machine::disabled
@@ -271,6 +271,102 @@ QGroupBox#Porthos::disabled
}
QWidget#INJECTOR, QTabWidget#INJECTOR
{
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,
stop: 0 #FFFFFF, stop: 1 #008b8b);
color: black;
font-size: 10pt;
font-style: normal;
font-weight: 600;
font-family: "Sans Serif";
border-radius: 0px;
margin-top: 0.0ex;
margin-left: 0.0ex;
padding-top: 2px;
padding-bottom: 4px;
}
QGroupBox#INJECTOR
{
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,
stop: 0 #008b8b, stop: 1 #ffffff);
color: #008b8b;
font-size: 10pt;
font-style: normal;
font-weight: 600;
font-family: "Sans Serif";
border: 2px solid #008b8b;
border-radius: 5px;
margin-top: 1.5ex;
margin-left: 0.0ex;
margin-bottom: 0.0ex;
padding-top: 2px;
padding-bottom: 4px;
qproperty-alignment: 'AlignCenter | AlignVCenter';
}
QGroupBox::title#INJECTOR {
subcontrol-origin: margin;
subcontrol-position: top center;
padding: 2px 2px 2px 2px;
margin: 0px 0px 0px 0px;
border: 2px solid #008b8b;
border-radius: 3px;
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,
stop: 0 #ffffff , stop: 1 #008b8b);
}
QWidget#CYCLOTRON, QTabWidget#CYCLOTRON
{
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,
stop: 0 #FFFFFF, stop: 1 #000047ab);
color: black;
font-size: 10pt;
font-style: normal;
font-weight: 600;
font-family: "Sans Serif";
border-radius: 0px;
margin-top: 0.0ex;
margin-left: 0.0ex;
padding-top: 2px;
padding-bottom: 4px;
}
QGroupBox#CYCLOTRON
{
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,
stop: 0 #0047ab, stop: 1 #ffffff);
color: #0047ab;
font-size: 10pt;
font-style: normal;
font-weight: 600;
font-family: "Sans Serif";
border: 2px solid #0047ab;
border-radius: 5px;
margin-top: 1.5ex;
margin-left: 0.0ex;
margin-bottom: 0.0ex;
padding-top: 2px;
padding-bottom: 4px;
qproperty-alignment: 'AlignCenter | AlignVCenter';
}
QGroupBox::title#CYCLOTRON {
subcontrol-origin: margin;
subcontrol-position: top center;
padding: 2px 2px 2px 2px;
margin: 0px 0px 0px 0px;
border: 2px solid #0047ab;
border-radius: 3px;
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,
stop: 0 #ffffff , stop: 1 #0047ab);
}
QWidget#MACHINE, QTabWidget#MACHINE
{
background-color: qlineargradient(x1: 0, y1: 0, x2: 0, y2: 1,

View File

@@ -1,111 +1,13 @@
Metadata-Version: 1.0
Name: elog
Version: 1.3.4
Summary: Python library to access Elog.
Home-page: https://github.com/paulscherrerinstitute/py_elog
Author: Paul Scherrer Institute (PSI)
Metadata-Version: 1.1
Name: pyscan
Version: 2.8.0
Summary: PyScan is a python class that performs a scan for single or multiple given knobs.
Home-page: UNKNOWN
Author: Paul Scherrer Institute
Author-email: UNKNOWN
License: UNKNOWN
Description: [![Build Status](https://travis-ci.org/paulscherrerinstitute/py_elog.svg?branch=master)](https://travis-ci.org/paulscherrerinstitute/py_elog) [![Build status](https://ci.appveyor.com/api/projects/status/glo428gqw951y512?svg=true)](https://ci.appveyor.com/project/simongregorebner/py-elog)
# Overview
This Python module provides a native interface [electronic logbooks](https://midas.psi.ch/elog/). It is compatible with Python versions 3.5 and higher.
# Usage
For accessing a logbook at ```http[s]://<hostename>:<port>/[<subdir>/]<logbook>/[<msg_id>]``` a logbook handle must be retrieved.
```python
import elog
# Open GFA SwissFEL test logbook
logbook = elog.open('https://elog-gfa.psi.ch/SwissFEL+test/')
# Contstructor using detailed arguments
# Open demo logbook on local host: http://localhost:8080/demo/
logbook = elog.open('localhost', 'demo', port=8080, use_ssl=False)
```
Once you have hold of the logbook handle one of its public methods can be used to read, create, reply to, edit or delete the message.
## Get Existing Message Ids
Get all the existing message ids of a logbook
```python
message_ids = logbook.get_message_ids()
```
To get if of the last inserted message
```python
last_message_id = logbook.get_last_message_id()
```
## Read Message
```python
# Read message with with message ID = 23
message, attributes, attachments = logbook.read(23)
```
## Create Message
```python
# Create new message with some text, attributes (dict of attributes + kwargs) and attachments
new_msg_id = logbook.post('This is message text', attributes=dict_of_attributes, attachments=list_of_attachments, attribute_as_param='value')
```
What attributes are required is determined by the configuration of the elog server (keywork `Required Attributes`).
If the configuration looks like this:
```
Required Attributes = Author, Type
```
You have to provide author and type when posting a message.
In case type need to be specified, the supported keywords can as well be found in the elog configuration with the key `Options Type`.
If the config looks like this:
```
Options Type = Routine, Software Installation, Problem Fixed, Configuration, Other
```
A working create call would look like this:
```python
new_msg_id = logbook.post('This is message text', author='me', type='Routine')
```
## Reply to Message
```python
# Reply to message with ID=23
new_msg_id = logbook.post('This is a reply', msg_id=23, reply=True, attributes=dict_of_attributes, attachments=list_of_attachments, attribute_as_param='value')
```
## Edit Message
```python
# Edit message with ID=23. Changed message text, some attributes (dict of edited attributes + kwargs) and new attachments
edited_msg_id = logbook.post('This is new message text', msg_id=23, attributes=dict_of_changed_attributes, attachments=list_of_new_attachments, attribute_as_param='new value')
```
## Delete Message (and all its replies)
```python
# Delete message with ID=23. All its replies will also be deleted.
logbook.delete(23)
```
__Note:__ Due to the way elog implements delete this function is only supported on english logbooks.
# Installation
The Elog module and only depends on the `passlib` and `requests` library used for password encryption and http(s) communication. It is packed as [anaconda package](https://anaconda.org/paulscherrerinstitute/elog) and can be installed as follows:
```bash
conda install -c paulscherrerinstitute elog
```
Keywords: elog,electronic,logbook
Description: UNKNOWN
Platform: UNKNOWN
Requires: numpy
Requires: pcaspy
Requires: requests

View File

@@ -1,8 +1,32 @@
README.md
setup.py
elog/__init__.py
elog/logbook.py
elog/logbook_exceptions.py
elog.egg-info/PKG-INFO
elog.egg-info/SOURCES.txt
elog.egg-info/dependency_links.txt
elog.egg-info/top_level.txt
pyscan/__init__.py
pyscan/config.py
pyscan/scan.py
pyscan/scan_actions.py
pyscan/scan_parameters.py
pyscan/scanner.py
pyscan/utils.py
pyscan.egg-info/PKG-INFO
pyscan.egg-info/SOURCES.txt
pyscan.egg-info/dependency_links.txt
pyscan.egg-info/top_level.txt
pyscan/dal/__init__.py
pyscan/dal/bsread_dal.py
pyscan/dal/epics_dal.py
pyscan/dal/function_dal.py
pyscan/dal/pshell_dal.py
pyscan/interface/__init__.py
pyscan/interface/pshell.py
pyscan/interface/pyScan/__init__.py
pyscan/interface/pyScan/scan.py
pyscan/interface/pyScan/utils.py
pyscan/positioner/__init__.py
pyscan/positioner/area.py
pyscan/positioner/bsread.py
pyscan/positioner/compound.py
pyscan/positioner/line.py
pyscan/positioner/serial.py
pyscan/positioner/static.py
pyscan/positioner/time.py
pyscan/positioner/vector.py

View File

@@ -1 +1 @@
elog
pyscan

View File

@@ -0,0 +1,111 @@
Metadata-Version: 1.0
Name: elog
Version: 1.3.4
Summary: Python library to access Elog.
Home-page: https://github.com/paulscherrerinstitute/py_elog
Author: Paul Scherrer Institute (PSI)
Author-email: UNKNOWN
License: UNKNOWN
Description: [![Build Status](https://travis-ci.org/paulscherrerinstitute/py_elog.svg?branch=master)](https://travis-ci.org/paulscherrerinstitute/py_elog) [![Build status](https://ci.appveyor.com/api/projects/status/glo428gqw951y512?svg=true)](https://ci.appveyor.com/project/simongregorebner/py-elog)
# Overview
This Python module provides a native interface [electronic logbooks](https://midas.psi.ch/elog/). It is compatible with Python versions 3.5 and higher.
# Usage
For accessing a logbook at ```http[s]://<hostename>:<port>/[<subdir>/]<logbook>/[<msg_id>]``` a logbook handle must be retrieved.
```python
import elog
# Open GFA SwissFEL test logbook
logbook = elog.open('https://elog-gfa.psi.ch/SwissFEL+test/')
# Contstructor using detailed arguments
# Open demo logbook on local host: http://localhost:8080/demo/
logbook = elog.open('localhost', 'demo', port=8080, use_ssl=False)
```
Once you have hold of the logbook handle one of its public methods can be used to read, create, reply to, edit or delete the message.
## Get Existing Message Ids
Get all the existing message ids of a logbook
```python
message_ids = logbook.get_message_ids()
```
To get if of the last inserted message
```python
last_message_id = logbook.get_last_message_id()
```
## Read Message
```python
# Read message with with message ID = 23
message, attributes, attachments = logbook.read(23)
```
## Create Message
```python
# Create new message with some text, attributes (dict of attributes + kwargs) and attachments
new_msg_id = logbook.post('This is message text', attributes=dict_of_attributes, attachments=list_of_attachments, attribute_as_param='value')
```
What attributes are required is determined by the configuration of the elog server (keywork `Required Attributes`).
If the configuration looks like this:
```
Required Attributes = Author, Type
```
You have to provide author and type when posting a message.
In case type need to be specified, the supported keywords can as well be found in the elog configuration with the key `Options Type`.
If the config looks like this:
```
Options Type = Routine, Software Installation, Problem Fixed, Configuration, Other
```
A working create call would look like this:
```python
new_msg_id = logbook.post('This is message text', author='me', type='Routine')
```
## Reply to Message
```python
# Reply to message with ID=23
new_msg_id = logbook.post('This is a reply', msg_id=23, reply=True, attributes=dict_of_attributes, attachments=list_of_attachments, attribute_as_param='value')
```
## Edit Message
```python
# Edit message with ID=23. Changed message text, some attributes (dict of edited attributes + kwargs) and new attachments
edited_msg_id = logbook.post('This is new message text', msg_id=23, attributes=dict_of_changed_attributes, attachments=list_of_new_attachments, attribute_as_param='new value')
```
## Delete Message (and all its replies)
```python
# Delete message with ID=23. All its replies will also be deleted.
logbook.delete(23)
```
__Note:__ Due to the way elog implements delete this function is only supported on english logbooks.
# Installation
The Elog module and only depends on the `passlib` and `requests` library used for password encryption and http(s) communication. It is packed as [anaconda package](https://anaconda.org/paulscherrerinstitute/elog) and can be installed as follows:
```bash
conda install -c paulscherrerinstitute elog
```
Keywords: elog,electronic,logbook
Platform: UNKNOWN

View File

@@ -0,0 +1,8 @@
setup.py
elog/__init__.py
elog/logbook.py
elog/logbook_exceptions.py
elog.egg-info/PKG-INFO
elog.egg-info/SOURCES.txt
elog.egg-info/dependency_links.txt
elog.egg-info/top_level.txt

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@
elog

View File

@@ -0,0 +1 @@

1
packages/pyscan.pth Executable file
View File

@@ -0,0 +1 @@
./pyscan-2.8.0-py3.7.egg

View File

@@ -0,0 +1,19 @@
# Import the scan part.
from .scan import *
from .scan_parameters import *
from .scan_actions import *
from .scanner import *
# Import DALs
from .dal.epics_dal import *
from .dal.bsread_dal import *
from .dal.pshell_dal import *
# Import positioners.
from .positioner.line import *
from .positioner.serial import *
from .positioner.vector import *
from .positioner.area import *
from .positioner.compound import *
from .positioner.time import *
from .positioner.static import *

58
packages/pyscan/config.py Normal file
View File

@@ -0,0 +1,58 @@
#########################
# General configuration #
#########################
# Minimum tolerance for comparing floats.
max_float_tolerance = 0.00001
# 1ms time tolerance for time critical measurements.
max_time_tolerance = 0.05
######################
# Scan configuration #
######################
# Default number of scans.
scan_default_n_measurements = 1
# Default interval between multiple measurements in a single position. Taken into account when n_measurements > 1.
scan_default_measurement_interval = 0
# Interval to sleep while the scan is paused.
scan_pause_sleep_interval = 0.1
# Maximum number of retries to read the channels to get valid data.
scan_acquisition_retry_limit = 3
# Delay between acquisition retries.
scan_acquisition_retry_delay = 1
############################
# BSREAD DAL configuration #
############################
# Queue size for collecting messages from bs_read.
bs_queue_size = 20
# Max time to wait until the bs read message we need arrives.
bs_read_timeout = 5
# Max time to wait for a message (if there is none). Important for stopping threads etc.
bs_receive_timeout = 1
# Default bs_read connection address.
bs_default_host = None
# Default bs_read connection port.
bs_default_port = None
# Default bs connection port.
bs_connection_mode = "sub"
# Default property value for bs properties missing in stream. Exception means to raise an Exception when this happens.
bs_default_missing_property_value = Exception
###########################
# EPICS DAL configuration #
###########################
# Default set and match timeout - how much time a PV has to reach the target value.
epics_default_set_and_match_timeout = 3
# After all motors have reached their destination (set_and_match), extra time to wait.
epics_default_settling_time = 0
############################
# PShell DAL configuration #
############################
pshell_default_server_url = "http://sf-daq-mgmt:8090"
pshell_default_scan_in_background = False

View File

View File

@@ -0,0 +1,186 @@
import math
from time import time
from bsread import Source, mflow
from pyscan import config
from pyscan.utils import convert_to_list
class ReadGroupInterface(object):
"""
Provide a beam synchronous acquisition for PV data.
"""
def __init__(self, properties, conditions=None, host=None, port=None, filter_function=None):
"""
Create the bsread group read interface.
:param properties: List of PVs to read for processing.
:param conditions: List of PVs to read as conditions.
:param filter_function: Filter the BS stream with a custom function.
"""
self.host = host
self.port = port
self.properties = convert_to_list(properties)
self.conditions = convert_to_list(conditions)
self.filter = filter_function
self._message_cache = None
self._message_cache_timestamp = None
self._message_cache_position_index = None
self._connect_bsread(config.bs_default_host, config.bs_default_port)
def _connect_bsread(self, host, port):
# Configure the connection type.
if config.bs_connection_mode.lower() == "sub":
mode = mflow.SUB
elif config.bs_connection_mode.lower() == "pull":
mode = mflow.PULL
if host and port:
self.stream = Source(host=host,
port=port,
queue_size=config.bs_queue_size,
receive_timeout=config.bs_receive_timeout,
mode=mode)
else:
channels = [x.identifier for x in self.properties] + [x.identifier for x in self.conditions]
self.stream = Source(channels=channels,
queue_size=config.bs_queue_size,
receive_timeout=config.bs_receive_timeout,
mode=mode)
self.stream.connect()
@staticmethod
def is_message_after_timestamp(message, timestamp):
"""
Check if the received message was captured after the provided timestamp.
:param message: Message to inspect.
:param timestamp: Timestamp to compare the message to.
:return: True if the message is after the timestamp, False otherwise.
"""
# Receive might timeout, in this case we have nothing to compare.
if not message:
return False
# This is how BSread encodes the timestamp.
current_sec = int(timestamp)
current_ns = int(math.modf(timestamp)[0] * 1e9)
message_sec = message.data.global_timestamp
message_ns = message.data.global_timestamp_offset
# If the seconds are the same, the nanoseconds must be equal or larger.
if message_sec == current_sec:
return message_ns >= current_ns
# If the seconds are not the same, the message seconds need to be larger than the current seconds.
else:
return message_sec > current_sec
@staticmethod
def _get_missing_property_default(property_definition):
"""
In case a bs read value is missing, either return the default value or raise an Exception.
:param property_definition:
:return:
"""
# Exception is defined, raise it.
if Exception == property_definition.default_value:
raise property_definition.default_value("Property '%s' missing in bs stream."
% property_definition.identifier)
# Else just return the default value.
else:
return property_definition.default_value
def _read_pvs_from_cache(self, properties):
"""
Read the requested properties from the cache.
:param properties: List of properties to read.
:return: List with PV values.
"""
if not self._message_cache:
raise ValueError("Message cache is empty, cannot read PVs %s." % properties)
pv_values = []
for property_name, property_definition in ((x.identifier, x) for x in properties):
if property_name in self._message_cache.data.data:
value = self._message_cache.data.data[property_name].value
else:
value = self._get_missing_property_default(property_definition)
# TODO: Check if the python conversion works in every case?
# BS read always return numpy, and we always convert it to Python.
pv_values.append(value)
return pv_values
def read(self, current_position_index=None, retry=False):
"""
Reads the PV values from BSread. It uses the first PVs data sampled after the invocation of this method.
:return: List of values for read pvs. Note: Condition PVs are excluded.
"""
# Perform the actual read.
read_timestamp = time()
while time() - read_timestamp < config.bs_read_timeout:
message = self.stream.receive(filter=self.filter)
if self.is_message_after_timestamp(message, read_timestamp):
self._message_cache = message
self._message_cache_position_index = current_position_index
self._message_cache_timestamp = read_timestamp
return self._read_pvs_from_cache(self.properties)
else:
raise Exception("Read timeout exceeded for BS read stream. Could not find the desired package in time.")
def read_cached_conditions(self):
"""
Returns the conditions associated with the last read command.
:return: List of condition values.
"""
return self._read_pvs_from_cache(self.conditions)
def close(self):
"""
Disconnect from the stream and clear the message cache.
"""
if self.stream:
self.stream.disconnect()
self._message_cache = None
self._message_cache_timestamp = None
class ImmediateReadGroupInterface(ReadGroupInterface):
@staticmethod
def is_message_after_timestamp(message, timestamp):
"""
Every message is a good message, expect a NULL one.
:param message: Message to inspect.
:param timestamp: Timestamp to compare the message to.
:return: True if the message is not None.
"""
# Receive might timeout, in this case we have nothing to compare.
if not message:
return False
return True
def read(self, current_position_index=None, retry=False):
# Invalidate cache on retry attempt.
if retry:
self._message_cache_position_index = None
# Message for this position already cached.
if current_position_index is not None and current_position_index == self._message_cache_position_index:
return self._read_pvs_from_cache(self.properties)
return super(ImmediateReadGroupInterface, self).read(current_position_index=current_position_index,
retry=retry)

View File

@@ -0,0 +1,208 @@
import time
from itertools import count
from pyscan import config
from pyscan.utils import convert_to_list, validate_lists_length, connect_to_pv, compare_channel_value
class PyEpicsDal(object):
"""
Provide a high level abstraction over PyEpics with group support.
"""
def __init__(self):
self.groups = {}
self.pvs = {}
def add_group(self, group_name, group_interface):
# Do not allow to overwrite the group.
if group_name in self.groups:
raise ValueError("Group with name %s already exists. "
"Use different name of close existing group first." % group_name)
self.groups[group_name] = group_interface
return group_name
def add_reader_group(self, group_name, pv_names):
self.add_group(group_name, ReadGroupInterface(pv_names))
def add_writer_group(self, group_name, pv_names, readback_pv_names=None, tolerances=None, timeout=None):
self.add_group(group_name, WriteGroupInterface(pv_names, readback_pv_names, tolerances, timeout))
def get_group(self, handle):
return self.groups.get(handle)
def close_group(self, group_name):
if group_name not in self.groups:
raise ValueError("Group does not exist. Available groups:\n%s" % self.groups.keys())
# Close the PV connection.
self.groups[group_name].close()
del self.groups[group_name]
def close_all_groups(self):
for group in self.groups.values():
group.close()
self.groups.clear()
class WriteGroupInterface(object):
"""
Manage a group of Write PVs.
"""
default_timeout = 5
default_get_sleep = 0.1
def __init__(self, pv_names, readback_pv_names=None, tolerances=None, timeout=None):
"""
Initialize the write group.
:param pv_names: PV names (or name, list or single string) to connect to.
:param readback_pv_names: PV names (or name, list or single string) of readback PVs to connect to.
:param tolerances: Tolerances to be used for set_and_match. You can also specify them on the set_and_match
:param timeout: Timeout to reach the destination.
"""
self.pv_names = convert_to_list(pv_names)
self.pvs = [self.connect(pv_name) for pv_name in self.pv_names]
if readback_pv_names:
self.readback_pv_name = convert_to_list(readback_pv_names)
self.readback_pvs = [self.connect(pv_name) for pv_name in self.readback_pv_name]
else:
self.readback_pv_name = self.pv_names
self.readback_pvs = self.pvs
self.tolerances = self._setup_tolerances(tolerances)
# We also do not allow timeout to be zero.
self.timeout = timeout or self.default_timeout
# Verify if all provided lists are of same size.
validate_lists_length(self.pvs, self.readback_pvs, self.tolerances)
# Check if timeout is int or float.
if not isinstance(self.timeout, (int, float)):
raise ValueError("Timeout must be int or float, but %s was provided." % self.timeout)
def _setup_tolerances(self, tolerances):
"""
Construct the list of tolerances. No tolerance can be less then the minimal tolerance.
:param tolerances: Input tolerances.
:return: Tolerances adjusted to the minimum value, if needed.
"""
# If the provided tolerances are empty, substitute them with a list of default tolerances.
tolerances = convert_to_list(tolerances) or [config.max_float_tolerance] * len(self.pvs)
# Each tolerance needs to be at least the size of the minimum tolerance.
tolerances = [max(config.max_float_tolerance, tolerance) for tolerance in tolerances]
return tolerances
def set_and_match(self, values, tolerances=None, timeout=None):
"""
Set the value and wait for the PV to reach it, within tollerance.
:param values: Values to set (Must match the number of PVs in this group)
:param tolerances: Tolerances for each PV (Must match the number of PVs in this group)
:param timeout: Timeout, single value, to wait until the value is reached.
:raise ValueError if any position cannot be reached.
"""
values = convert_to_list(values)
if not tolerances:
tolerances = self.tolerances
else:
# We do not allow tolerances to be less than the default tolerance.
tolerances = self._setup_tolerances(tolerances)
if not timeout:
timeout = self.timeout
# Verify if all provided lists are of same size.
validate_lists_length(self.pvs, values, tolerances)
# Check if timeout is int or float.
if not isinstance(timeout, (int, float)):
raise ValueError("Timeout must be int or float, but %s was provided." % timeout)
# Write all the PV values.
for pv, value in zip(self.pvs, values):
pv.put(value)
# Boolean array to represent which PVs have reached their target value.s
within_tolerance = [False] * len(self.pvs)
initial_timestamp = time.time()
# Read values until all PVs have reached the desired value or time has run out.
while (not all(within_tolerance)) and (time.time() - initial_timestamp < timeout):
# Get only the PVs that have not yet reached the final position.
for index, pv, tolerance in ((index, pv, tolerance) for index, pv, tolerance, values_reached
in zip(count(), self.readback_pvs, tolerances, within_tolerance)
if not values_reached):
current_value = pv.get()
expected_value = values[index]
if compare_channel_value(current_value, expected_value, tolerance):
within_tolerance[index] = True
time.sleep(self.default_get_sleep)
if not all(within_tolerance):
error_message = ""
# Get the indexes that did not reach the supposed values.
for index in [index for index, reached_value in enumerate(within_tolerance) if not reached_value]:
expected_value = values[index]
pv_name = self.pv_names[index]
tolerance = tolerances[index]
error_message += "Cannot achieve value %s, on PV %s, with tolerance %s.\n" % \
(expected_value, pv_name, tolerance)
raise ValueError(error_message)
@staticmethod
def connect(pv_name):
return connect_to_pv(pv_name)
def close(self):
"""
Close all PV connections.
"""
for pv in self.pvs:
pv.disconnect()
class ReadGroupInterface(object):
"""
Manage group of read PVs.
"""
def __init__(self, pv_names):
"""
Initialize the group.
:param pv_names: PV names (or name, list or single string) to connect to.
"""
self.pv_names = convert_to_list(pv_names)
self.pvs = [self.connect(pv_name) for pv_name in self.pv_names]
def read(self, current_position_index=None, retry=None):
"""
Read PVs one by one.
:param current_position_index: Index of the current scan.
:param retry: Is this the first read attempt or a retry.
:return: Result
"""
result = []
for pv in self.pvs:
result.append(pv.get())
return result
@staticmethod
def connect(pv_name):
return connect_to_pv(pv_name)
def close(self):
"""
Close all PV connections.
"""
for pv in self.pvs:
pv.disconnect()

View File

@@ -0,0 +1,40 @@
from pyscan.utils import convert_to_list
class FunctionProxy(object):
"""
Provide an interface for using external methods as DAL.
"""
def __init__(self, functions):
"""
Initialize the function dal.
:param functions: List (or single item) of FUNCTION_VALUE type.
"""
self.functions = convert_to_list(functions)
def read(self, current_position_index=None, retry=False):
"""
Read the results from all the provided functions.
:return: Read results.
"""
results = []
for func in self.functions:
# The function either accepts the current position index, or nothing.
try:
result = func.call_function()
except TypeError:
result = func.call_function(current_position_index)
results.append(result)
return results
def write(self, values):
"""
Write the values to the provided functions.
:param values: Values to write.
"""
values = convert_to_list(values)
for func, value in zip(self.functions, values):
func.call_function(value)

View File

@@ -0,0 +1,118 @@
import json
from collections import OrderedDict
import requests
from bsread.data.helpers import get_channel_reader
from pyscan import config
SERVER_URL_PATHS = {
"run": "/run",
"data": "/data-bs"
}
class PShellFunction(object):
def __init__(self, script_name, parameters, server_url=None, scan_in_background=None, multiple_parameters=False,
return_values=None):
if server_url is None:
server_url = config.pshell_default_server_url
if scan_in_background is None:
scan_in_background = config.pshell_default_scan_in_background
self.script_name = script_name
self.parameters = parameters
self.server_url = server_url.rstrip("/")
self.scan_in_background = scan_in_background
self.multiple_parameters = multiple_parameters
self.return_values = return_values
@staticmethod
def _load_raw_data(server_url, data_path):
load_data_url = server_url + SERVER_URL_PATHS["data"] + "/" + data_path
raw_data = requests.get(url=load_data_url, stream=True).raw.read()
return raw_data
@classmethod
def read_raw_data(cls, data_path, server_url=None):
if server_url is None:
server_url = config.pshell_default_server_url
raw_data_bytes = cls._load_raw_data(server_url, data_path)
offset = 0
def read_chunk():
nonlocal offset
nonlocal raw_data_bytes
size = int.from_bytes(raw_data_bytes[offset:offset + 4], byteorder='big', signed=False)
# Offset for the size of the length.
offset += 4
data_chunk = raw_data_bytes[offset:offset + size]
offset += size
return data_chunk
# First chunk is main header.
main_header = json.loads(read_chunk().decode(), object_pairs_hook=OrderedDict)
# Second chunk is data header.
data_header = json.loads(read_chunk().decode(), object_pairs_hook=OrderedDict)
result_data = {}
for channel in data_header["channels"]:
raw_channel_data = read_chunk()
raw_channel_timestamp = read_chunk()
channel_name = channel["name"]
# Default encoding is small, other valid value is 'big'.
channel["encoding"] = "<" if channel.get("encoding", "little") else ">"
channel_value_reader = get_channel_reader(channel)
result_data[channel_name] = channel_value_reader(raw_channel_data)
return result_data
def read(self, current_position_index=None, retry=False):
parameters = self.get_scan_parameters(current_position_index)
run_request = {"script": self.script_name,
"pars": parameters,
"background": self.scan_in_background}
raw_scan_result = self._execute_scan(run_request)
scan_result = json.loads(raw_scan_result)
return scan_result
def get_scan_parameters(self, current_position_index):
if self.multiple_parameters:
try:
position_parameters = self.parameters[current_position_index]
except IndexError:
raise ValueError("Cannot find parameters for position index %s. Parameters: " %
(current_position_index, self.parameters))
return position_parameters
else:
return self.parameters
def _execute_scan(self, execution_parameters):
run_url = self.server_url + SERVER_URL_PATHS["run"]
result = requests.put(url=run_url, json=execution_parameters)
if result.status_code != 200:
raise Exception(result.text)
return result.text

View File

View File

@@ -0,0 +1,385 @@
from pyscan import scan, action_restore, ZigZagVectorPositioner, VectorPositioner, CompoundPositioner
from pyscan.scan import EPICS_READER
from pyscan.positioner.area import AreaPositioner, ZigZagAreaPositioner
from pyscan.positioner.line import ZigZagLinePositioner, LinePositioner
from pyscan.positioner.time import TimePositioner
from pyscan.scan_parameters import scan_settings
from pyscan.utils import convert_to_list
def _generate_scan_parameters(relative, writables, latency):
# If the scan is relative we collect the initial writables offset, and restore the state at the end of the scan.
offsets = None
finalization_action = []
if relative:
pv_names = [x.pv_name for x in convert_to_list(writables) or []]
reader = EPICS_READER(pv_names)
offsets = reader.read()
reader.close()
finalization_action.append(action_restore(writables))
settings = scan_settings(settling_time=latency)
return offsets, finalization_action, settings
def _convert_steps_parameter(steps):
n_steps = None
step_size = None
steps_list = convert_to_list(steps)
# If steps is a float or a list of floats, then this are step sizes.
if isinstance(steps_list[0], float):
step_size = steps_list
# If steps is an int, this is the number of steps.
elif isinstance(steps, int):
n_steps = steps
return n_steps, step_size
def lscan(writables, readables, start, end, steps, latency=0.0, relative=False,
passes=1, zigzag=False, before_read=None, after_read=None, title=None):
"""Line Scan: positioners change together, linearly from start to end positions.
Args:
writables(list of Writable): Positioners set on each step.
readables(list of Readable): Sensors to be sampled on each step.
start(list of float): start positions of writables.
end(list of float): final positions of writables.
steps(int or float or list of float): number of scan steps (int) or step size (float).
relative (bool, optional): if true, start and end positions are relative to
current at start of the scan
latency(float, optional): settling time for each step before readout, defaults to 0.0.
passes(int, optional): number of passes
zigzag(bool, optional): if true writables invert direction on each pass.
before_read (function, optional): callback on each step, before each readout. Callback may have as
optional parameters list of positions.
after_read (function, optional): callback on each step, after each readout. Callback may have as
optional parameters a ScanRecord object.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
offsets, finalization_actions, settings = _generate_scan_parameters(relative, writables, latency)
n_steps, step_size = _convert_steps_parameter(steps)
if zigzag:
positioner_class = ZigZagLinePositioner
else:
positioner_class = LinePositioner
positioner = positioner_class(start=start, end=end, step_size=step_size,
n_steps=n_steps, offsets=offsets, passes=passes)
result = scan(positioner, readables, writables, before_read=before_read, after_read=after_read, settings=settings,
finalization=finalization_actions)
return result
def ascan(writables, readables, start, end, steps, latency=0.0, relative=False,
passes=1, zigzag=False, before_read=None, after_read=None, title=None):
"""
Area Scan: multi-dimentional scan, each positioner is a dimention.
:param writables: List of identifiers to write to at each step.
:param readables: List of identifiers to read from at each step.
:param start: Start position for writables.
:param end: Stop position for writables.
:param steps: Number of scan steps(integer) or step size (float).
:param latency: Settling time before each readout. Default = 0.
:param relative: Start and stop positions are relative to the current position.
:param passes: Number of passes for each scan.
:param zigzag: If True and passes > 1, invert moving direction on each pass.
:param before_read: List of callback functions on each step before readback.
:param after_read: List of callback functions on each step after readback.
:param title: Not used in this implementation - legacy.
:return: Data from the scan.
"""
offsets, finalization_actions, settings = _generate_scan_parameters(relative, writables, latency)
n_steps, step_size = _convert_steps_parameter(steps)
if zigzag:
positioner_class = ZigZagAreaPositioner
else:
positioner_class = AreaPositioner
positioner = positioner_class(start=start, end=end, step_size=step_size,
n_steps=n_steps, offsets=offsets, passes=passes)
result = scan(positioner, readables, writables, before_read=before_read, after_read=after_read, settings=settings,
finalization=finalization_actions)
return result
def vscan(writables, readables, vector, line=False, latency=0.0, relative=False, passes=1, zigzag=False,
before_read=None, after_read=None, title=None):
"""Vector Scan: positioners change following values provided in a vector.
Args:
writables(list of Writable): Positioners set on each step.
readables(list of Readable): Sensors to be sampled on each step.
vector(list of list of float): table of positioner values.
line (bool, optional): if true, processs as line scan (1d)
relative (bool, optional): if true, start and end positions are relative to current at
start of the scan
latency(float, optional): settling time for each step before readout, defaults to 0.0.
passes(int, optional): number of passes
zigzag(bool, optional): if true writables invert direction on each pass.
before_read (function, optional): callback on each step, before each readout.
after_read (function, optional): callback on each step, after each readout.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
offsets, finalization_actions, settings = _generate_scan_parameters(relative, writables, latency)
# The compound positioner does not allow you to do zigzag positioning.
if not line and zigzag:
raise ValueError("Area vector scan cannot use zigzag positioning.")
if zigzag:
positioner_class = ZigZagVectorPositioner
else:
positioner_class = VectorPositioner
# If the vector is treated as a line scan, move all motors to the next position at the same time.
if line:
positioner = positioner_class(positions=vector, passes=passes, offsets=offsets)
# The vector is treated as an area scan. Move motors one by one, covering all positions.
else:
vector = convert_to_list(vector)
if not all(isinstance(x, list) for x in vector):
raise ValueError("In case of area scan, a list of lists is required for a vector.")
positioner = CompoundPositioner([VectorPositioner(positions=x, passes=passes, offsets=offsets)
for x in vector])
result = scan(positioner, readables, writables, before_read=before_read, after_read=after_read, settings=settings,
finalization=finalization_actions)
return result
def rscan(writable, readables, regions, latency=0.0, relative=False, passes=1, zigzag=False, before_read=None,
after_read=None, title=None):
"""Region Scan: positioner scanned linearly, from start to end positions, in multiple regions.
Args:
writable(Writable): Positioner set on each step, for each region.
readables(list of Readable): Sensors to be sampled on each step.
regions (list of tuples (float,float, int) or (float,float, float)): each tuple define a scan region
(start, stop, steps) or (start, stop, step_size)
relative (bool, optional): if true, start and end positions are relative to
current at start of the scan
latency(float, optional): settling time for each step before readout, defaults to 0.0.
passes(int, optional): number of passes
zigzag(bool, optional): if true writable invert direction on each pass.
before_read (function, optional): callback on each step, before each readout. Callback may have as
optional parameters list of positions.
after_read (function, optional): callback on each step, after each readout. Callback may have as
optional parameters a ScanRecord object.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
raise NotImplementedError("Region scan not supported.")
def cscan(writables, readables, start, end, steps, latency=0.0, time=None, relative=False, passes=1, zigzag=False,
before_read=None, after_read=None, title=None):
"""Continuous Scan: positioner change continuously from start to end position and readables are sampled on the fly.
Args:
writable(Speedable or list of Motor): A positioner with a getSpeed method or
a list of motors.
readables(list of Readable): Sensors to be sampled on each step.
start(float or list of float): start positions of writables.
end(float or list of float): final positions of writabless.
steps(int or float or list of float): number of scan steps (int) or step size (float).
time (float, seconds): if not None then writables is Motor array and speeds are
set according to time.
relative (bool, optional): if true, start and end positions are relative to
current at start of the scan
latency(float, optional): sleep time in each step before readout, defaults to 0.0.
before_read (function, optional): callback on each step, before each readout.
Callback may have as optional parameters list of positions.
after_read (function, optional): callback on each step, after each readout.
Callback may have as optional parameters a ScanRecord object.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
raise NotImplementedError("Continuous scan not supported.")
def hscan(config, writable, readables, start, end, steps, passes=1, zigzag=False, before_stream=None, after_stream=None,
after_read=None, title=None):
"""Hardware Scan: values sampled by external hardware and received asynchronously.
Args:
config(dict): Configuration of the hardware scan. The "class" key provides the implementation class.
Other keys are implementation specific.
writable(Writable): A positioner appropriated to the hardware scan type.
readables(list of Readable): Sensors appropriated to the hardware scan type.
start(float): start positions of writable.
end(float): final positions of writables.
steps(int or float): number of scan steps (int) or step size (float).
before_stream (function, optional): callback before just before starting positioner move.
after_stream (function, optional): callback before just after stopping positioner move.
after_read (function, optional): callback on each readout.
Callback may have as optional parameters a ScanRecord object.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
raise NotImplementedError("Hardware scan not supported.")
def bscan(stream, records, before_read=None, after_read=None, title=None):
"""BS Scan: records all values in a beam synchronous stream.
Args:
stream(Stream): stream object
records(int): number of records to store
before_read (function, optional): callback on each step, before each readout.
Callback may have as optional parameters list of positions.
after_read (function, optional): callback on each step, after each readout.
Callback may have as optional parameters a ScanRecord object.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
raise NotImplementedError("BS scan not supported.")
def tscan(readables, points, interval, before_read=None, after_read=None, title=None):
"""Time Scan: sensors are sampled in fixed time intervals.
Args:
readables(list of Readable): Sensors to be sampled on each step.
points(int): number of samples.
interval(float): time interval between readouts. Minimum temporization is 0.001s
before_read (function, optional): callback on each step, before each readout.
after_read (function, optional): callback on each step, after each readout.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
positioner = TimePositioner(interval, points)
result = scan(positioner, readables, before_read=before_read, after_read=after_read)
return result
def mscan(trigger, readables, points, timeout=None, async=True, take_initial=False, before_read=None, after_read=None,
title=None):
"""Monitor Scan: sensors are sampled when received change event of the trigger device.
Args:
trigger(Device): Source of the sampling triggering.
readables(list of Readable): Sensors to be sampled on each step.
If trigger has cache and is included in readables, it is not read
for each step, but the change event value is used.
points(int): number of samples.
timeout(float, optional): maximum scan time in seconds.
async(bool, optional): if True then records are sampled and stored on event change callback. Enforce
reading only cached values of sensors.
If False, the scan execution loop waits for trigger cache update. Do not make
cache only access, but may loose change events.
take_initial(bool, optional): if True include current values as first record (before first trigger).
before_read (function, optional): callback on each step, before each readout.
after_read (function, optional): callback on each step, after each readout.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
raise NotImplementedError("Monitor scan not supported.")
def escan(name, title=None):
"""Epics Scan: execute an Epics Scan Record.
Args:
name(str): Name of scan record.
title(str, optional): plotting window name.
Returns:
ScanResult object.
"""
raise NotImplementedError("Epics scan not supported.")
def bsearch(writables, readable, start, end, steps, maximum=True, strategy="Normal", latency=0.0, relative=False,
before_read=None, after_read=None, title=None):
"""Binary search: searches writables in a binary search fashion to find a local maximum for the readable.
Args:
writables(list of Writable): Positioners set on each step.
readable(Readable): Sensor to be sampled.
start(list of float): start positions of writables.
end(list of float): final positions of writables.
steps(float or list of float): resolution of search for each writable.
maximum (bool , optional): if True (default) search maximum, otherwise minimum.
strategy (str , optional): "Normal": starts search midway to scan range and advance in the best direction.
Uses orthogonal neighborhood (4-neighborhood for 2d)
"Boundary": starts search on scan range.
"FullNeighborhood": Uses complete neighborhood (8-neighborhood for 2d)
latency(float, optional): settling time for each step before readout, defaults to 0.0.
relative (bool, optional): if true, start and end positions are relative to current at
start of the scan
before_read (function, optional): callback on each step, before each readout.
after_read (function, optional): callback on each step, after each readout.
title(str, optional): plotting window name.
Returns:
SearchResult object.
"""
raise NotImplementedError("Binary search scan not supported.")
def hsearch(writables, readable, range_min, range_max, initial_step, resolution, noise_filtering_steps=1, maximum=True,
latency=0.0, relative=False, before_read=None, after_read=None, title=None):
"""Hill Climbing search: searches writables in decreasing steps to find a local maximum for the readable.
Args:
writables(list of Writable): Positioners set on each step.
readable(Readable): Sensor to be sampled.
range_min(list of float): minimum positions of writables.
range_max(list of float): maximum positions of writables.
initial_step(float or list of float):initial step size for for each writable.
resolution(float or list of float): resolution of search for each writable (minimum step size).
noise_filtering_steps(int): number of aditional steps to filter noise
maximum (bool , optional): if True (default) search maximum, otherwise minimum.
latency(float, optional): settling time for each step before readout, defaults to 0.0.
relative (bool, optional): if true, range_min and range_max positions are relative to current at
start of the scan
before_read (function, optional): callback on each step, before each readout.
after_read (function, optional): callback on each step, after each readout.
title(str, optional): plotting window name.
Returns:
SearchResult object.
"""
raise NotImplementedError("Hill climbing scan not supported.")

View File

@@ -0,0 +1 @@
from .scan import *

View File

@@ -0,0 +1,713 @@
import traceback
from copy import deepcopy
from datetime import datetime
from time import sleep
import numpy as np
from pyscan.dal.epics_dal import PyEpicsDal
from pyscan.interface.pyScan.utils import PyScanDataProcessor
from pyscan.positioner.compound import CompoundPositioner
from pyscan.positioner.serial import SerialPositioner
from pyscan.positioner.vector import VectorPositioner
from pyscan.scan_parameters import scan_settings
from pyscan.scanner import Scanner
from pyscan.utils import convert_to_list, convert_to_position_list, compare_channel_value
READ_GROUP = "Measurements"
WRITE_GROUP = "Knobs"
MONITOR_GROUP = "Monitors"
class Scan(object):
def execute_scan(self):
after_executor = self.get_action_executor("In-loopPostAction")
# Wrap the post action executor to update the number of completed scans.
def progress_after_executor(scanner_instance, data):
# Execute other post actions.
after_executor(scanner_instance)
# Update progress.
self.n_done_measurements += 1
self.ProgDisp.Progress = 100.0 * (self.n_done_measurements /
self.n_total_positions)
def prepare_monitors(reader):
# If there are no monitors defined we have nothing to validate.
if not self.dimensions[-1]["Monitor"]:
return None
def validate_monitors(position, data):
monitor_values = reader.read()
combined_data = zip(self.dimensions[-1]['Monitor'],
self.dimensions[-1]['MonitorValue'],
self.dimensions[-1]['MonitorTolerance'],
self.dimensions[-1]['MonitorAction'],
self.dimensions[-1]['MonitorTimeout'],
monitor_values)
for pv, expected_value, tolerance, action, timeout, value in combined_data:
# Monitor value does not match.
if not compare_channel_value(value, expected_value, tolerance):
if action == "Abort":
raise ValueError("Monitor %s, expected value %s, tolerance %s, has value %s. Aborting."
% (pv, expected_value, tolerance, value))
elif action == "WaitAndAbort":
return False
else:
raise ValueError("MonitorAction %s, on PV %s, is not supported." % (pv, action))
return True
return validate_monitors
# Setup scan settings.
settings = scan_settings(settling_time=self.dimensions[-1]["KnobWaitingExtra"],
n_measurements=self.dimensions[-1]["NumberOfMeasurements"],
measurement_interval=self.dimensions[-1]["Waiting"])
data_processor = PyScanDataProcessor(self.outdict,
n_readbacks=self.n_readbacks,
n_validations=self.n_validations,
n_observables=self.n_observables,
n_measurements=settings.n_measurements)
self.scanner = Scanner(positioner=self.get_positioner(), data_processor=data_processor,
reader=self.epics_dal.get_group(READ_GROUP).read,
writer=self.epics_dal.get_group(WRITE_GROUP).set_and_match,
before_measurement_executor=self.get_action_executor("In-loopPreAction"),
after_measurement_executor=progress_after_executor,
initialization_executor=self.get_action_executor("PreAction"),
finalization_executor=self.get_action_executor("PostAction"),
data_validator=prepare_monitors(self.epics_dal.get_group(MONITOR_GROUP)),
settings=settings)
self.outdict.update(self.scanner.discrete_scan())
def get_positioner(self):
"""
Generate a positioner for the provided dimensions.
:return: Positioner object.
"""
# Read all the initial positions - in case we need to do an additive scan.
initial_positions = self.epics_dal.get_group(READ_GROUP).read()
positioners = []
knob_readback_offset = 0
for dimension in self.dimensions:
is_additive = bool(dimension.get("Additive", 0))
is_series = bool(dimension.get("Series", 0))
n_knob_readbacks = len(dimension["KnobReadback"])
# This dimension uses relative positions, read the PVs initial state.
# We also need initial positions for the series scan.
if is_additive or is_series:
offsets = convert_to_list(
initial_positions[knob_readback_offset:knob_readback_offset + n_knob_readbacks])
else:
offsets = None
# Series scan in this dimension, use StepByStepVectorPositioner.
if is_series:
# In the StepByStep positioner, the initial values need to be added to the steps.
positions = convert_to_list(dimension["ScanValues"])
positioners.append(SerialPositioner(positions, initial_positions=offsets,
offsets=offsets if is_additive else None))
# Line scan in this dimension, use VectorPositioner.
else:
positions = convert_to_position_list(convert_to_list(dimension["KnobExpanded"]))
positioners.append(VectorPositioner(positions, offsets=offsets))
# Increase the knob readback offset.
knob_readback_offset += n_knob_readbacks
# Assemble all individual positioners together.
positioner = CompoundPositioner(positioners)
return positioner
def get_action_executor(self, entry_name):
actions = []
max_waiting = 0
for dim_index, dim in enumerate(self.dimensions):
for action_index, action in enumerate(dim[entry_name]):
set_pv, read_pv, value, tolerance, timeout = action
if set_pv == "match":
raise NotImplementedError("match not yet implemented for PreAction.")
# Initialize the write group, to speed up in loop stuff.
group_name = "%s_%d_%d" % (entry_name, dim_index, action_index)
self.epics_dal.add_writer_group(group_name, set_pv, read_pv, tolerance, timeout)
actions.append((group_name, value))
if entry_name + "Waiting" in dim:
max_waiting = max(max_waiting, dim[entry_name + "Waiting"])
def execute(scanner):
for action in actions:
name = action[0]
value = action[1]
# Retrieve the epics group and write the value.
self.epics_dal.get_group(name).set_and_match(value)
sleep(max_waiting)
return execute
class DummyProgress(object):
def __init__(self):
# For Thomas?
self.Progress = 1
self.abortScan = 0
def __init__(self):
self.dimensions = None
self.epics_dal = None
self.scanner = None
self.outdict = None
self.all_read_pvs = None
self.n_readbacks = None
self.n_validations = None
self.n_observables = None
self.n_total_positions = None
self.n_measurements = None
# Accessed by some clients.
self.ProgDisp = Scan.DummyProgress()
self._pauseScan = 0
# Just to make old GUI work.
self._abortScan = 0
self.n_done_measurements = 0
@property
def abortScan(self):
return self._abort_scan
@abortScan.setter
def abortScan(self, value):
self._abortScan = value
if self._abortScan:
self.scanner.abort_scan()
@property
def pauseScan(self):
return self._pauseScan
@pauseScan.setter
def pauseScan(self, value):
self._pauseScan = value
if self._pauseScan:
self.scanner.pause_scan()
else:
self.scanner.resume_scan()
def initializeScan(self, inlist, dal=None):
"""
Initialize and verify the provided scan values.
:param inlist: List of dictionaries for each dimension.
:param dal: Which reader should be used to access the PVs. Default: PyEpicsDal.
:return: Dictionary with results.
"""
if not inlist:
raise ValueError("Provided inlist is empty.")
if dal is not None:
self.epics_dal = dal
else:
self.epics_dal = PyEpicsDal()
# Prepare the scan dimensions.
if isinstance(inlist, list):
self.dimensions = inlist
# In case it is a simple one dimensional scan.
else:
self.dimensions = [inlist]
try:
for index, dic in enumerate(self.dimensions):
# We read most of the PVs only if declared in the last dimension.
is_last_dimension = index == (len(self.dimensions) - 1)
# Just in case there are identical input dictionaries. (Normally, it may not happen.)
dic['ID'] = index
# Waiting time.
if is_last_dimension and ('Waiting' not in dic.keys()):
raise ValueError('Waiting for the scan was not given.')
# Validation channels - values just added to the results.
if 'Validation' in dic.keys():
if not isinstance(dic['Validation'], list):
raise ValueError('Validation should be a list of channels. Input dictionary %d.' % index)
else:
dic['Validation'] = []
# Relative scan.
if 'Additive' not in dic.keys():
dic['Additive'] = 0
# Step back when pause is invoked.
if is_last_dimension and ('StepbackOnPause' not in dic.keys()):
dic['StepbackOnPause'] = 1
# Number of measurments per position.
if is_last_dimension and ('NumberOfMeasurements' not in dic.keys()):
dic['NumberOfMeasurements'] = 1
# PVs to sample.
if is_last_dimension and ('Observable' not in dic.keys()):
raise ValueError('The observable is not given.')
elif is_last_dimension:
if not isinstance(dic['Observable'], list):
dic['Observable'] = [dic['Observable']]
self._setup_knobs(index, dic)
self._setup_knob_scan_values(index, dic)
self._setup_pre_actions(index, dic)
self._setup_inloop_pre_actions(index, dic)
self._setup_post_action(index, dic)
self._setup_inloop_post_action(index, dic)
# Total number of measurements
self.n_total_positions = 1
for dic in self.dimensions:
if not dic['Series']:
self.n_total_positions = self.n_total_positions * dic['Nstep']
else:
self.n_total_positions = self.n_total_positions * sum(dic['Nstep'])
self._setup_epics_dal()
# Monitors only in the last dimension.
self._setup_monitors(self.dimensions[-1])
# Prealocating the place for the output
self.outdict = {"ErrorMessage": None,
"KnobReadback": self.allocateOutput(),
"Validation": self.allocateOutput(),
"Observable": self.allocateOutput()}
except ValueError:
self.outdict = {"ErrorMessage": traceback.format_exc()}
# Backward compatibility.
self.ProgDisp.Progress = 0
self.ProgDisp.abortScan = 0
self._pauseScan = 0
self.abortScan = 0
self.n_done_measurements = 0
return self.outdict
def allocateOutput(self):
root_list = []
for dimension in reversed(self.dimensions):
n_steps = dimension['Nstep']
if dimension['Series']:
# For Series scan, each step of each knob represents another result.
current_dimension_list = []
for n_steps_in_knob in n_steps:
current_knob_list = []
for _ in range(n_steps_in_knob):
current_knob_list.append(deepcopy(root_list))
current_dimension_list.append(deepcopy(current_knob_list))
root_list = current_dimension_list
else:
# For line scan, each step represents another result.
current_dimension_list = []
for _ in range(n_steps):
current_dimension_list.append(deepcopy(root_list))
root_list = current_dimension_list
return root_list
def _setup_epics_dal(self):
# Collect all PVs that need to be read at each scan step.
self.all_read_pvs = []
all_write_pvs = []
all_readback_pvs = []
all_tolerances = []
max_knob_waiting = -1
self.n_readbacks = 0
for d in self.dimensions:
self.all_read_pvs.append(d['KnobReadback'])
self.n_readbacks += len(d['KnobReadback'])
# Collect all data need to write to PVs.
all_write_pvs.append(d["Knob"])
all_readback_pvs.append(d["KnobReadback"])
all_tolerances.append(d["KnobTolerance"])
max_knob_waiting = max(max_knob_waiting, max(d["KnobWaiting"]))
self.all_read_pvs.append(self.dimensions[-1]['Validation'])
self.n_validations = len(self.dimensions[-1]['Validation'])
self.all_read_pvs.append(self.dimensions[-1]['Observable'])
self.n_observables = len(self.dimensions[-1]['Observable'])
# Expand all read PVs
self.all_read_pvs = [item for sublist in self.all_read_pvs for item in sublist]
# Expand Knobs and readbacks PVs.
all_write_pvs = [item for sublist in all_write_pvs for item in sublist]
all_readback_pvs = [item for sublist in all_readback_pvs for item in sublist]
all_tolerances = [item for sublist in all_tolerances for item in sublist]
# Initialize PV connections and check if all PV names are valid.
self.epics_dal.add_reader_group(READ_GROUP, self.all_read_pvs)
self.epics_dal.add_writer_group(WRITE_GROUP, all_write_pvs, all_readback_pvs, all_tolerances, max_knob_waiting)
def _setup_knobs(self, index, dic):
"""
Setup the values for moving knobs in the scan.
:param index: Index in the dictionary.
:param dic: The dictionary.
"""
if 'Knob' not in dic.keys():
raise ValueError('Knob for the scan was not given for the input dictionary %d.' % index)
else:
if not isinstance(dic['Knob'], list):
dic['Knob'] = [dic['Knob']]
if 'KnobReadback' not in dic.keys():
dic['KnobReadback'] = dic['Knob']
if not isinstance(dic['KnobReadback'], list):
dic['KnobReadback'] = [dic['KnobReadback']]
if len(dic['KnobReadback']) != len(dic['Knob']):
raise ValueError('The number of KnobReadback does not meet to the number of Knobs.')
if 'KnobTolerance' not in dic.keys():
dic['KnobTolerance'] = [1.0] * len(dic['Knob'])
if not isinstance(dic['KnobTolerance'], list):
dic['KnobTolerance'] = [dic['KnobTolerance']]
if len(dic['KnobTolerance']) != len(dic['Knob']):
raise ValueError('The number of KnobTolerance does not meet to the number of Knobs.')
if 'KnobWaiting' not in dic.keys():
dic['KnobWaiting'] = [10.0] * len(dic['Knob'])
if not isinstance(dic['KnobWaiting'], list):
dic['KnobWaiting'] = [dic['KnobWaiting']]
if len(dic['KnobWaiting']) != len(dic['Knob']):
raise ValueError('The number of KnobWaiting does not meet to the number of Knobs.')
if 'KnobWaitingExtra' not in dic.keys():
dic['KnobWaitingExtra'] = 0.0
else:
try:
dic['KnobWaitingExtra'] = float(dic['KnobWaitingExtra'])
except:
raise ValueError('KnobWaitingExtra is not a number in the input dictionary %d.' % index)
# Originally dic["Knob"] values were saved. I'm supposing this was a bug - readback values needed to be saved.
# TODO: We can optimize this by moving the initialization in the epics_dal init
# but pre actions need to be moved after the epics_dal init than
self.epics_dal.add_reader_group("KnobReadback", dic['KnobReadback'])
dic['KnobSaved'] = self.epics_dal.get_group("KnobReadback").read()
self.epics_dal.close_group("KnobReadback")
def _setup_knob_scan_values(self, index, dic):
if 'Series' not in dic.keys():
dic['Series'] = 0
if not dic['Series']: # Setting up scan values for SKS and MKS
if 'ScanValues' not in dic.keys():
if 'ScanRange' not in dic.keys():
raise ValueError('Neither ScanRange nor ScanValues is given '
'in the input dictionary %d.' % index)
elif not isinstance(dic['ScanRange'], list):
raise ValueError('ScanRange is not given in the right format. '
'Input dictionary %d.' % index)
elif not isinstance(dic['ScanRange'][0], list):
dic['ScanRange'] = [dic['ScanRange']]
if ('Nstep' not in dic.keys()) and ('StepSize' not in dic.keys()):
raise ValueError('Neither Nstep nor StepSize is given.')
if 'Nstep' in dic.keys(): # StepSize is ignored when Nstep is given
if not isinstance(dic['Nstep'], int):
raise ValueError('Nstep should be an integer. Input dictionary %d.' % index)
ran = []
for r in dic['ScanRange']:
s = (r[1] - r[0]) / (dic['Nstep'] - 1)
f = np.arange(r[0], r[1], s)
f = np.append(f, np.array(r[1]))
ran.append(f.tolist())
dic['KnobExpanded'] = ran
else: # StepSize given
if len(dic['Knob']) > 1:
raise ValueError('Give Nstep instead of StepSize for MKS. '
'Input dictionary %d.' % index)
# StepSize is only valid for SKS
r = dic['ScanRange'][0]
# TODO: THIS IS RECONSTRUCTED AND MIGHT BE WRONG, CHECK!
s = dic['StepSize'][0]
f = np.arange(r[0], r[1], s)
f = np.append(f, np.array(r[1]))
dic['Nstep'] = len(f)
dic['KnobExpanded'] = [f.tolist()]
else:
# Scan values explicitly defined.
if not isinstance(dic['ScanValues'], list):
raise ValueError('ScanValues is not given in the right fromat. '
'Input dictionary %d.' % index)
if len(dic['ScanValues']) != len(dic['Knob']) and len(dic['Knob']) != 1:
raise ValueError('The length of ScanValues does not meet to the number of Knobs.')
if len(dic['Knob']) > 1:
minlen = 100000
for r in dic['ScanValues']:
if minlen > len(r):
minlen = len(r)
ran = []
for r in dic['ScanValues']:
ran.append(r[0:minlen]) # Cut at the length of the shortest list.
dic['KnobExpanded'] = ran
dic['Nstep'] = minlen
else:
dic['KnobExpanded'] = [dic['ScanValues']]
dic['Nstep'] = len(dic['ScanValues'])
else: # Setting up scan values for Series scan
if 'ScanValues' not in dic.keys():
raise ValueError('ScanValues should be given for Series '
'scan in the input dictionary %d.' % index)
if not isinstance(dic['ScanValues'], list):
raise ValueError('ScanValues should be given as a list (of lists) '
'for Series scan in the input dictionary %d.' % index)
if len(dic['Knob']) != len(dic['ScanValues']):
raise ValueError('Scan values length does not match to the '
'number of knobs in the input dictionary %d.' % index)
Nstep = []
for vl in dic['ScanValues']:
if not isinstance(vl, list):
raise ValueError('ScanValue element should be given as a list for '
'Series scan in the input dictionary %d.' % index)
Nstep.append(len(vl))
dic['Nstep'] = Nstep
def _setup_pre_actions(self, index, dic):
if 'PreAction' in dic.keys():
if not isinstance(dic['PreAction'], list):
raise ValueError('PreAction should be a list. Input dictionary %d.' % index)
for l in dic['PreAction']:
if not isinstance(l, list):
raise ValueError('Every PreAction should be a list. Input dictionary %d.' % index)
if len(l) != 5:
if not l[0] == 'SpecialAction':
raise ValueError('Every PreAction should be in a form of '
'[Ch-set, Ch-read, Value, Tolerance, Timeout]. '
'Input dictionary ' + str(index) + '.')
if 'PreActionWaiting' not in dic.keys():
dic['PreActionWaiting'] = 0.0
if not isinstance(dic['PreActionWaiting'], float) and not isinstance(dic['PreActionWaiting'], int):
raise ValueError('PreActionWating should be a float. Input dictionary %d.' % index)
if 'PreActionOrder' not in dic.keys():
dic['PreActionOrder'] = [0] * len(dic['PreAction'])
if not isinstance(dic['PreActionOrder'], list):
raise ValueError('PreActionOrder should be a list. Input dictionary %d.' % index)
else:
dic['PreAction'] = []
dic['PreActionWaiting'] = 0.0
dic['PreActionOrder'] = [0] * len(dic['PreAction'])
def _setup_inloop_pre_actions(self, index, dic):
if 'In-loopPreAction' in dic.keys():
if not isinstance(dic['In-loopPreAction'], list):
raise ValueError('In-loopPreAction should be a list. Input dictionary %d.' % index)
for l in dic['In-loopPreAction']:
if not isinstance(l, list):
raise ValueError('Every In-loopPreAction should be a list. '
'Input dictionary ' + str(index) + '.')
if len(l) != 5:
if not l[0] == 'SpecialAction':
raise ValueError('Every In-loopPreAction should be in a form of '
'[Ch-set, Ch-read, Value, Tolerance, Timeout]. '
'Input dictionary ' + str(index) + '.')
if 'In-loopPreActionWaiting' not in dic.keys():
dic['In-loopPreActionWaiting'] = 0.0
if not isinstance(dic['In-loopPreActionWaiting'], float) and not isinstance(
dic['In-loopPreActionWaiting'], int):
raise ValueError('In-loopPreActionWating should be a float. Input dictionary %d.' % index)
if 'In-loopPreActionOrder' not in dic.keys():
dic['In-loopPreActionOrder'] = [0] * len(dic['In-loopPreAction'])
if not isinstance(dic['In-loopPreActionOrder'], list):
raise ValueError('In-loopPreActionOrder should be a list. Input dictionary %d.' % index)
else:
dic['In-loopPreAction'] = []
dic['In-loopPreActionWaiting'] = 0.0
dic['In-loopPreActionOrder'] = [0] * len(dic['In-loopPreAction'])
def _setup_post_action(self, index, dic):
if 'PostAction' in dic.keys():
if dic['PostAction'] == 'Restore':
PA = []
for index in range(0, len(dic['Knob'])):
k = dic['Knob'][index]
v = dic['KnobSaved'][index]
PA.append([k, k, v, 1.0, 10])
dic['PostAction'] = PA
elif not isinstance(dic['PostAction'], list):
raise ValueError('PostAction should be a list. Input dictionary %d.' % index)
Restore = 0
for index in range(0, len(dic['PostAction'])):
l = dic['PostAction'][index]
if l == 'Restore':
Restore = 1
PA = []
for j in range(0, len(dic['Knob'])):
k = dic['Knob'][j]
v = dic['KnobSaved'][j]
PA.append([k, k, v, 1.0, 10])
elif not isinstance(l, list):
raise ValueError('Every PostAction should be a list. Input dictionary %d.' % index)
elif len(l) != 5:
if not l[0] == 'SpecialAction':
raise ValueError('Every PostAction should be in a form of '
'[Ch-set, Ch-read, Value, Tolerance, Timeout]. '
'Input dictionary %d.' % index)
if Restore:
dic['PostAction'].remove('Restore')
dic['PostAction'] = dic['PostAction'] + PA
else:
dic['PostAction'] = []
def _setup_inloop_post_action(self, index, dic):
if 'In-loopPostAction' in dic.keys():
if dic['In-loopPostAction'] == 'Restore':
PA = []
for index in range(0, len(dic['Knob'])):
k = dic['Knob'][index]
v = dic['KnobSaved'][index]
PA.append([k, k, v, 1.0, 10])
dic['In-loopPostAction'] = PA
elif not isinstance(dic['In-loopPostAction'], list):
raise ValueError('In-loopPostAction should be a list. Input dictionary %d.' % index)
Restore = 0
for index in range(0, len(dic['In-loopPostAction'])):
l = dic['In-loopPostAction'][index]
if l == 'Restore':
Restore = 1
PA = []
for j in range(0, len(dic['Knob'])):
k = dic['Knob'][j]
v = dic['KnobSaved'][j]
PA.append([k, k, v, 1.0, 10])
dic['In-loopPostAction'][index] = PA
elif not isinstance(l, list):
raise ValueError('Every In-loopPostAction should be a list. '
'Input dictionary %d.' % index)
elif len(l) != 5:
raise ValueError('Every In-loopPostAction should be in a form of '
'[Ch-set, Ch-read, Value, Tolerance, Timeout]. '
'Input dictionary %d.' % index)
if Restore:
dic['In-loopPostAction'].remove('Restore')
dic['In-loopPostAction'] = dic['In-loopPostAction'] + PA
else:
dic['In-loopPostAction'] = []
def _setup_monitors(self, dic):
if ('Monitor' in dic.keys()) and (dic['Monitor']):
if isinstance(dic['Monitor'], str):
dic['Monitor'] = [dic['Monitor']]
# Initialize monitor group and check if all monitor PVs are valid.
self.epics_dal.add_reader_group(MONITOR_GROUP, dic["Monitor"])
if 'MonitorValue' not in dic.keys():
dic["MonitorValue"] = self.epics_dal.get_group(MONITOR_GROUP).read()
elif not isinstance(dic['MonitorValue'], list):
dic['MonitorValue'] = [dic['MonitorValue']]
if len(dic['MonitorValue']) != len(dic['Monitor']):
raise ValueError('The length of MonitorValue does not meet to the length of Monitor.')
# Try to construct the monitor tolerance, if not given.
if 'MonitorTolerance' not in dic.keys():
dic['MonitorTolerance'] = []
for value in self.epics_dal.get_group(MONITOR_GROUP).read():
if isinstance(value, str):
# No tolerance for string values.
dic['MonitorTolerance'].append(None)
elif value == 0:
# Default tolerance for unknown values is 0.1.
dic['MonitorTolerance'].append(0.1)
else:
# 10% of the current value will be the torelance when not given
dic['MonitorTolerance'].append(abs(value * 0.1))
elif not isinstance(dic['MonitorTolerance'], list):
dic['MonitorTolerance'] = [dic['MonitorTolerance']]
if len(dic['MonitorTolerance']) != len(dic['Monitor']):
raise ValueError('The length of MonitorTolerance does not meet to the length of Monitor.')
if 'MonitorAction' not in dic.keys():
raise ValueError('MonitorAction is not give though Monitor is given.')
if not isinstance(dic['MonitorAction'], list):
dic['MonitorAction'] = [dic['MonitorAction']]
for m in dic['MonitorAction']:
if m != 'Abort' and m != 'Wait' and m != 'WaitAndAbort':
raise ValueError('MonitorAction shold be Wait, Abort, or WaitAndAbort.')
if 'MonitorTimeout' not in dic.keys():
dic['MonitorTimeout'] = [30.0] * len(dic['Monitor'])
elif not isinstance(dic['MonitorTimeout'], list):
dic['MonitorValue'] = [dic['MonitorValue']]
if len(dic['MonitorValue']) != len(dic['Monitor']):
raise ValueError('The length of MonitorValue does not meet to the length of Monitor.')
for m in dic['MonitorTimeout']:
try:
float(m)
except:
raise ValueError('MonitorTimeout should be a list of float(or int).')
else:
dic['Monitor'] = []
dic['MonitorValue'] = []
dic['MonitorTolerance'] = []
dic['MonitorAction'] = []
dic['MonitorTimeout'] = []
def startScan(self):
if self.outdict['ErrorMessage']:
if 'After the last scan,' not in self.outdict['ErrorMessage']:
self.outdict['ErrorMessage'] = 'It seems that the initialization was not successful... ' \
'No scan was performed.'
return self.outdict
# Execute the scan.
self.outdict['TimeStampStart'] = datetime.now()
self.execute_scan()
self.outdict['TimeStampEnd'] = datetime.now()
self.outdict['ErrorMessage'] = 'Measurement finalized (finished/aborted) normally. ' \
'Need initialisation before next measurement.'
# Cleanup after the scan.
self.epics_dal.close_all_groups()
return self.outdict

View File

@@ -0,0 +1,41 @@
from pyscan.utils import flat_list_generator
class PyScanDataProcessor(object):
def __init__(self, output, n_readbacks, n_validations, n_observables, n_measurements):
self.n_readbacks = n_readbacks
self.n_validations = n_validations
self.n_observables = n_observables
self.n_measurements = n_measurements
self.output = output
self.KnobReadback_output_position = flat_list_generator(self.output["KnobReadback"])
self.Validation_output_position = flat_list_generator(self.output["Validation"])
self.Observable_output_position = flat_list_generator(self.output["Observable"])
def process(self, position, data):
# Just we can always iterate over it.
if self.n_measurements == 1:
data = [data]
# Cells for each measurement are already prepared.
readback_result = [measurement[0:self.n_readbacks]
for measurement in data]
validation_result = [measurement[self.n_readbacks:self.n_readbacks + self.n_validations]
for measurement in data]
interval_start = self.n_readbacks + self.n_validations
interval_end = self.n_readbacks + self.n_validations + self.n_observables
observable_result = [measurement[interval_start:interval_end]
for measurement in data]
if self.n_measurements == 1:
next(self.KnobReadback_output_position).extend(readback_result[0])
next(self.Validation_output_position).extend(validation_result[0])
next(self.Observable_output_position).extend(observable_result[0])
else:
next(self.KnobReadback_output_position).extend(readback_result)
next(self.Validation_output_position).extend(validation_result)
next(self.Observable_output_position).extend(observable_result)
def get_data(self):
return self.output

View File

View File

@@ -0,0 +1,184 @@
import math
from copy import copy
from pyscan.utils import convert_to_list
class AreaPositioner(object):
def _validate_parameters(self):
if not len(self.start) == len(self.end):
raise ValueError("Number of start %s and end %s positions do not match." %
(self.start, self.end))
if (self.n_steps and self.step_size) or (not self.n_steps and not self.step_size):
raise ValueError("N_steps (%s) or step_sizes (%s) must be set, but not none "
"or both of them at the same time." % (self.step_size, self.n_steps))
if self.n_steps and (not len(self.n_steps) == len(self.start)):
raise ValueError("The number of n_steps %s does not match the number of start positions %s." %
(self.n_steps, self.start))
if self.n_steps and not all(isinstance(x, int) for x in self.n_steps):
raise ValueError("The n_steps %s must have only integers." % self.n_steps)
if self.step_size and (not len(self.step_size) == len(self.start)):
raise ValueError("The number of step sizes %s does not match the number of start positions %s." %
(self.step_size, self.start))
if not isinstance(self.passes, int) or self.passes < 1:
raise ValueError("Passes must be a positive integer value, but %s was given." % self.passes)
if self.offsets and (not len(self.offsets) == len(self.start)):
raise ValueError("Number of offsets %s does not match the number of start positions %s." %
(self.offsets, self.start))
def __init__(self, start, end, n_steps=None, step_size=None, passes=1, offsets=None):
self.start = convert_to_list(start)
self.end = convert_to_list(end)
self.n_steps = convert_to_list(n_steps)
self.step_size = convert_to_list(step_size)
self.passes = passes
self.offsets = convert_to_list(offsets)
self._validate_parameters()
# Get the number of axis to scan.
self.n_axis = len(self.start)
# Fix the offsets if provided.
if self.offsets:
self.start = [offset + original_value for original_value, offset in zip(self.start, self.offsets)]
self.end = [offset + original_value for original_value, offset in zip(self.end, self.offsets)]
# Number of steps case.
if self.n_steps:
self.step_size = [(end - start) / steps for start, end, steps
in zip(self.start, self.end, self.n_steps)]
# Step size case.
elif self.step_size:
self.n_steps = [math.floor((end - start) / step_size) for start, end, step_size
in zip(self.start, self.end, self.step_size)]
def get_generator(self):
for _ in range(self.passes):
positions = copy(self.start)
# Return the initial state.
yield copy(positions)
# Recursive call to print all axis values.
def scan_axis(axis_number):
# We should not scan axis that do not exist.
if not axis_number < self.n_axis:
return
# Output all position on the next axis while this axis is still at the start position.
yield from scan_axis(axis_number + 1)
# Move axis step by step.
for _ in range(self.n_steps[axis_number]):
positions[axis_number] = positions[axis_number] + self.step_size[axis_number]
yield copy(positions)
# Output all positions from the next axis for each value of this axis.
yield from scan_axis(axis_number + 1)
# Clean up after the loop - return the axis value back to the start value.
positions[axis_number] = self.start[axis_number]
yield from scan_axis(0)
class ZigZagAreaPositioner(AreaPositioner):
def get_generator(self):
for pass_number in range(self.passes):
# Directions (positive ascending, negative descending) for each axis.
directions = [1] * self.n_axis
positions = copy(self.start)
# Return the initial state.
yield copy(positions)
# Recursive call to print all axis values.
def scan_axis(axis_number):
# We should not scan axis that do not exist.
if not axis_number < self.n_axis:
return
# Output all position on the next axis while this axis is still at the start position.
yield from scan_axis(axis_number + 1)
# Move axis step by step.
for _ in range(self.n_steps[axis_number]):
positions[axis_number] = positions[axis_number] + (self.step_size[axis_number]
* directions[axis_number])
yield copy(positions)
# Output all positions from the next axis for each value of this axis.
yield from scan_axis(axis_number + 1)
# Invert the direction for the next iteration on this axis.
directions[axis_number] *= -1
yield from scan_axis(0)
class MultiAreaPositioner(object):
def __init__(self, start, end, steps, passes=1, offsets=None):
self.offsets = offsets
self.passes = passes
self.end = end
self.start = start
# Get the number of axis to scan.
self.n_axis = len(self.start)
# Fix the offsets if provided.
if self.offsets:
self.start = [[original_value + offset for original_value, offset in zip(original_values, offsets)]
for original_values, offsets in zip(self.start, self.offsets)]
self.end = [[original_value + offset for original_value, offset in zip(original_values, offsets)]
for original_values, offsets in zip(self.end, self.offsets)]
# Number of steps case.
if isinstance(steps[0][0], int):
# TODO: Verify that each axis has positive steps and that all are ints (all steps or step_size)
self.n_steps = steps
self.step_size = [[(end - start) / steps for start, end, steps in zip(starts, ends, line_steps)]
for starts, ends, line_steps in zip(self.start, self.end, steps)]
# Step size case.
elif isinstance(steps[0][0], float):
# TODO: Verify that each axis has the same number of steps and that the step_size is correct (positive etc.)
self.n_steps = [[math.floor((end - start) / step) for start, end, step in zip(starts, ends, line_steps)]
for starts, ends, line_steps in zip(self.start, self.end, steps)]
self.step_size = steps
# Something went wrong
else:
# TODO: Raise an exception.
pass
def get_generator(self):
for _ in range(self.passes):
positions = copy(self.start)
# Return the initial state.
yield copy(positions)
# Recursive call to print all axis values.
def scan_axis(axis_number):
# We should not scan axis that do not exist.
if not axis_number < self.n_axis:
return
# Output all position on the next axis while this axis is still at the start position.
yield from scan_axis(axis_number + 1)
# Move axis step by step.
# TODO: Figure out what to do with this steps.
for _ in range(self.n_steps[axis_number][0]):
positions[axis_number] = [position + step_size for position, step_size
in zip(positions[axis_number], self.step_size[axis_number])]
yield copy(positions)
# Output all positions from the next axis for each value of this axis.
yield from scan_axis(axis_number + 1)
# Clean up after the loop - return the axis value back to the start value.
positions[axis_number] = self.start[axis_number]
yield from scan_axis(0)

View File

@@ -0,0 +1,21 @@
class BsreadPositioner(object):
def __init__(self, n_messages):
"""
Acquire N consecutive messages from the stream.
:param n_messages: Number of messages to acquire.
"""
self.n_messages = n_messages
self.bs_reader = None
def set_bs_reader(self, bs_reader):
self.bs_reader = bs_reader
def get_generator(self):
if self.bs_reader is None:
raise RuntimeError("Set bs_reader before using this generator.")
for index in range(self.n_messages):
self.bs_reader.read(index)
yield index

View File

@@ -0,0 +1,21 @@
from copy import copy
from pyscan.utils import convert_to_list
class CompoundPositioner(object):
"""
Given a list of positioners, it compounds them in given order, getting values from each of them at every step.
"""
def __init__(self, positioners):
self.positioners = positioners
self.n_positioners = len(positioners)
def get_generator(self):
def walk_positioner(index, output_positions):
if index == self.n_positioners:
yield copy(output_positions)
else:
for current_positions in self.positioners[index].get_generator():
yield from walk_positioner(index+1, output_positions + convert_to_list(current_positions))
yield from walk_positioner(0, [])

View File

@@ -0,0 +1,91 @@
import math
from copy import copy
from pyscan.utils import convert_to_list
class LinePositioner(object):
def _validate_parameters(self):
if not len(self.start) == len(self.end):
raise ValueError("Number of start %s and end %s positions do not match." %
(self.start, self.end))
# Only 1 among n_steps and step_sizes must be set.
if (self.n_steps is not None and self.step_size) or (self.n_steps is None and not self.step_size):
raise ValueError("N_steps (%s) or step_sizes (%s) must be set, but not none "
"or both of them at the same time." % (self.step_size, self.n_steps))
# If n_steps is set, than it must be an integer greater than 0.
if (self.n_steps is not None) and (not isinstance(self.n_steps, int) or self.n_steps < 1):
raise ValueError("Steps must be a positive integer value, but %s was given." % self.n_steps)
if self.step_size and (not len(self.step_size) == len(self.start)):
raise ValueError("The number of step sizes %s does not match the number of start positions %s." %
(self.step_size, self.start))
if not isinstance(self.passes, int) or self.passes < 1:
raise ValueError("Passes must be a positive integer value, but %s was given." % self.passes)
if self.offsets and (not len(self.offsets) == len(self.start)):
raise ValueError("Number of offsets %s does not match the number of start positions %s." %
(self.offsets, self.start))
def __init__(self, start, end, n_steps=None, step_size=None, passes=1, offsets=None):
self.start = convert_to_list(start)
self.end = convert_to_list(end)
self.n_steps = n_steps
self.step_size = convert_to_list(step_size)
self.passes = passes
self.offsets = convert_to_list(offsets)
self._validate_parameters()
# Fix the offsets if provided.
if self.offsets:
self.start = [offset + original_value for original_value, offset in zip(self.start, self.offsets)]
self.end = [offset + original_value for original_value, offset in zip(self.end, self.offsets)]
# Number of steps case.
if self.n_steps:
self.step_size = [(end - start) / self.n_steps for start, end in zip(self.start, self.end)]
# Step size case.
elif self.step_size:
n_steps_per_axis = [math.floor((end - start) / step_size) for start, end, step_size
in zip(self.start, self.end, self.step_size)]
# Verify that all axis do the same number of steps.
if not all(x == n_steps_per_axis[0] for x in n_steps_per_axis):
raise ValueError("The step sizes %s must give the same number of steps for each start %s "
"and end % pair." % (self.step_size, self.start, self.end))
# All the elements in n_steps_per_axis must be the same anyway.
self.n_steps = n_steps_per_axis[0]
def get_generator(self):
for _ in range(self.passes):
# The initial position is always the start position.
current_positions = copy(self.start)
yield current_positions
for __ in range(self.n_steps):
current_positions = [position + step_size for position, step_size
in zip(current_positions, self.step_size)]
yield current_positions
class ZigZagLinePositioner(LinePositioner):
def get_generator(self):
# The initial position is always the start position.
current_positions = copy(self.start)
yield current_positions
for pass_number in range(self.passes):
# Positive direction means we increase the position each step, negative we decrease.
direction = 1 if pass_number % 2 == 0 else -1
for __ in range(self.n_steps):
current_positions = [position + (step_size * direction) for position, step_size
in zip(current_positions, self.step_size)]
yield current_positions

View File

@@ -0,0 +1,40 @@
from copy import copy
from pyscan.utils import convert_to_list
class SerialPositioner(object):
"""
Scan over all provided points, one by one, returning the previous to the initial state.
Each axis is treated as a separate line.
"""
def __init__(self, positions, initial_positions, passes=1, offsets=None):
self.positions = positions
self.passes = passes
self.offsets = offsets
if passes < 1:
raise ValueError("Number of passes cannot be less than 1, but %d was provided." % passes)
self.initial_positions = initial_positions
self.n_axis = len(self.initial_positions)
# In case only 1 axis is provided, still wrap it in a list, because it makes the generator code easier.
if self.n_axis == 1:
self.positions = [positions]
# Fix the offset if provided.
if self.offsets:
for axis_positions, offset in zip(self.positions, self.offsets):
axis_positions[:] = [original_position + offset for original_position in axis_positions]
def get_generator(self):
for _ in range(self.passes):
# For each axis.
for axis_index in range(self.n_axis):
current_state = copy(self.initial_positions)
n_steps_in_axis = len(self.positions[axis_index])
for axis_position_index in range(n_steps_in_axis):
current_state[axis_index] = convert_to_list(self.positions[axis_index])[axis_position_index]
yield copy(current_state)

View File

@@ -0,0 +1,12 @@
class StaticPositioner(object):
def __init__(self, n_images):
"""
Acquire N consecutive images in a static position.
:param n_images: Number of images to acquire.
"""
self.n_images = n_images
def get_generator(self):
for index in range(self.n_images):
yield index

View File

@@ -0,0 +1,52 @@
from time import time, sleep
from pyscan.config import max_time_tolerance
smoothing_factor = 0.95
class TimePositioner(object):
def __init__(self, time_interval, n_intervals, tolerance=None):
"""
Time interval at which to read data.
:param time_interval: Time interval in seconds.
:param n_intervals: How many intervals to measure.
"""
self.time_interval = time_interval
# Tolerance cannot be less than the min set tolerance.
if tolerance is None or tolerance < max_time_tolerance:
tolerance = max_time_tolerance
self.tolerance = tolerance
# Minimum one measurement.
if n_intervals < 1:
n_intervals = 1
self.n_intervals = n_intervals
def get_generator(self):
measurement_time_start = time()
last_time_to_sleep = 0
for _ in range(self.n_intervals):
measurement_time_stop = time()
# How much time did the measurement take.
measurement_time = measurement_time_stop - measurement_time_start
time_to_sleep = self.time_interval - measurement_time
# Use the smoothing factor to attenuate variations in the measurement time.
time_to_sleep = (smoothing_factor * time_to_sleep) + ((1-smoothing_factor) * last_time_to_sleep)
# Time to sleep is negative (more time has elapsed, we cannot achieve the requested time interval.
if time_to_sleep < (-1 * max_time_tolerance):
raise ValueError("The requested time interval cannot be achieved. Last iteration took %.2f seconds, "
"but a %.2f seconds time interval was set." % (measurement_time, self.time_interval))
# Sleep only if time to sleep is positive.
if time_to_sleep > 0:
sleep(time_to_sleep)
last_time_to_sleep = time_to_sleep
measurement_time_start = time()
# Return the timestamp at which the measurement should begin.
yield measurement_time_start

View File

@@ -0,0 +1,52 @@
from itertools import cycle, chain
from pyscan.utils import convert_to_list
class VectorPositioner(object):
"""
Moves over the provided positions.
"""
def _validate_parameters(self):
if not all(len(convert_to_list(x)) == len(convert_to_list(self.positions[0])) for x in self.positions):
raise ValueError("All positions %s must have the same number of axis." % self.positions)
if not isinstance(self.passes, int) or self.passes < 1:
raise ValueError("Passes must be a positive integer value, but %s was given." % self.passes)
if self.offsets and (not len(self.offsets) == len(self.positions[0])):
raise ValueError("Number of offsets %s does not match the number of positions %s." %
(self.offsets, self.positions[0]))
def __init__(self, positions, passes=1, offsets=None):
self.positions = convert_to_list(positions)
self.passes = passes
self.offsets = convert_to_list(offsets)
self._validate_parameters()
# Number of positions to move to.
self.n_positions = len(self.positions)
# Fix the offset if provided.
if self.offsets:
for step_positions in self.positions:
step_positions[:] = [original_position + offset
for original_position, offset in zip(step_positions, self.offsets)]
def get_generator(self):
for _ in range(self.passes):
for position in self.positions:
yield position
class ZigZagVectorPositioner(VectorPositioner):
def get_generator(self):
# This creates a generator for [0, 1, 2, 3... n, n-1, n-2.. 2, 1, 0.....]
indexes = cycle(chain(range(0, self.n_positions, 1), range(self.n_positions - 2, 0, -1)))
# First pass has the full number of items, each subsequent has one less (extreme sequence item).
n_indexes = self.n_positions + ((self.passes - 1) * (self.n_positions - 1))
for x in range(n_indexes):
yield self.positions[next(indexes)]

260
packages/pyscan/scan.py Normal file
View File

@@ -0,0 +1,260 @@
import logging
from pyscan.dal import epics_dal, bsread_dal, function_dal
from pyscan.dal.function_dal import FunctionProxy
from pyscan.positioner.bsread import BsreadPositioner
from pyscan.scanner import Scanner
from pyscan.scan_parameters import EPICS_PV, EPICS_CONDITION, BS_PROPERTY, BS_CONDITION, scan_settings, convert_input, \
FUNCTION_VALUE, FUNCTION_CONDITION, convert_conditions, ConditionAction, ConditionComparison
from pyscan.utils import convert_to_list, SimpleDataProcessor, ActionExecutor, compare_channel_value
# Instances to use.
EPICS_WRITER = epics_dal.WriteGroupInterface
EPICS_READER = epics_dal.ReadGroupInterface
BS_READER = bsread_dal.ReadGroupInterface
FUNCTION_PROXY = function_dal.FunctionProxy
DATA_PROCESSOR = SimpleDataProcessor
ACTION_EXECUTOR = ActionExecutor
_logger = logging.getLogger(__name__)
def scan(positioner, readables, writables=None, conditions=None, before_read=None, after_read=None, initialization=None,
finalization=None, settings=None, data_processor=None, before_move=None, after_move=None):
# Initialize the scanner instance.
scanner_instance = scanner(positioner, readables, writables, conditions, before_read, after_read, initialization,
finalization, settings, data_processor, before_move, after_move)
return scanner_instance.discrete_scan()
def scanner(positioner, readables, writables=None, conditions=None, before_read=None, after_read=None,
initialization=None, finalization=None, settings=None, data_processor=None,
before_move=None, after_move=None):
# Allow a list or a single value to be passed. Initialize None values.
writables = convert_input(convert_to_list(writables) or [])
readables = convert_input(convert_to_list(readables) or [])
conditions = convert_conditions(convert_to_list(conditions) or [])
before_read = convert_to_list(before_read) or []
after_read = convert_to_list(after_read) or []
before_move = convert_to_list(before_move) or []
after_move = convert_to_list(after_move) or []
initialization = convert_to_list(initialization) or []
finalization = convert_to_list(finalization) or []
settings = settings or scan_settings()
# TODO: Ugly. The scanner should not depend on a particular positioner implementation.
if isinstance(positioner, BsreadPositioner) and settings.n_measurements > 1:
raise ValueError("When using BsreadPositioner the maximum number of n_measurements = 1.")
bs_reader = _initialize_bs_dal(readables, conditions, settings.bs_read_filter, positioner)
epics_writer, epics_pv_reader, epics_condition_reader = _initialize_epics_dal(writables,
readables,
conditions,
settings)
function_writer, function_reader, function_condition = _initialize_function_dal(writables,
readables,
conditions)
writables_order = [type(writable) for writable in writables]
# Write function needs to merge PV and function proxy data.
def write_data(positions):
positions = convert_to_list(positions)
pv_values = [x for x, source in zip(positions, writables_order) if source == EPICS_PV]
function_values = [x for x, source in zip(positions, writables_order) if source == FUNCTION_VALUE]
if epics_writer:
epics_writer.set_and_match(pv_values)
if function_writer:
function_writer.write(function_values)
# Order of value sources, needed to reconstruct the correct order of the result.
readables_order = [type(readable) for readable in readables]
# Read function needs to merge BS, PV, and function proxy data.
def read_data(current_position_index, retry=False):
_logger.debug("Reading data for position index %s." % current_position_index)
bs_values = iter(bs_reader.read(current_position_index, retry) if bs_reader else [])
epics_values = iter(epics_pv_reader.read(current_position_index) if epics_pv_reader else [])
function_values = iter(function_reader.read(current_position_index) if function_reader else [])
# Interleave the values correctly.
result = []
for source in readables_order:
if source == BS_PROPERTY:
next_result = next(bs_values)
elif source == EPICS_PV:
next_result = next(epics_values)
elif source == FUNCTION_VALUE:
next_result = next(function_values)
else:
raise ValueError("Unknown type of readable %s used." % source)
# We flatten the result, whenever possible.
if isinstance(next_result, list) and source != FUNCTION_VALUE:
result.extend(next_result)
else:
result.append(next_result)
return result
# Order of value sources, needed to reconstruct the correct order of the result.
conditions_order = [type(condition) for condition in conditions]
# Validate function needs to validate both BS, PV, and function proxy data.
def validate_data(current_position_index, data):
_logger.debug("Reading data for position index %s." % current_position_index)
bs_values = iter(bs_reader.read_cached_conditions() if bs_reader else [])
epics_values = iter(epics_condition_reader.read(current_position_index) if epics_condition_reader else [])
function_values = iter(function_condition.read(current_position_index) if function_condition else [])
for index, source in enumerate(conditions_order):
if source == BS_CONDITION:
value = next(bs_values)
elif source == EPICS_CONDITION:
value = next(epics_values)
elif source == FUNCTION_CONDITION:
value = next(function_values)
else:
raise ValueError("Unknown type of condition %s used." % source)
value_valid = False
# Function conditions are self contained.
if source == FUNCTION_CONDITION:
if value:
value_valid = True
else:
expected_value = conditions[index].value
tolerance = conditions[index].tolerance
operation = conditions[index].operation
if compare_channel_value(value, expected_value, tolerance, operation):
value_valid = True
if not value_valid:
if conditions[index].action == ConditionAction.Retry:
return False
if source == FUNCTION_CONDITION:
raise ValueError("Function condition %s returned False." % conditions[index].identifier)
else:
raise ValueError("Condition %s failed, expected value %s, actual value %s, "
"tolerance %s, operation %s." %
(conditions[index].identifier,
conditions[index].value,
value,
conditions[index].tolerance,
conditions[index].operation))
return True
if not data_processor:
data_processor = DATA_PROCESSOR()
# Before acquisition hook.
before_measurement_executor = None
if before_read:
before_measurement_executor = ACTION_EXECUTOR(before_read).execute
# After acquisition hook.
after_measurement_executor = None
if after_read:
after_measurement_executor = ACTION_EXECUTOR(after_read).execute
# Executor before each move.
before_move_executor = None
if before_move:
before_move_executor = ACTION_EXECUTOR(before_move).execute
# Executor after each move.
after_move_executor = None
if after_move:
after_move_executor = ACTION_EXECUTOR(after_move).execute
# Initialization (before move to first position) hook.
initialization_executor = None
if initialization:
initialization_executor = ACTION_EXECUTOR(initialization).execute
# Finalization (after last acquisition AND on error) hook.
finalization_executor = None
if finalization:
finalization_executor = ACTION_EXECUTOR(finalization).execute
scanner = Scanner(positioner=positioner, data_processor=data_processor, reader=read_data,
writer=write_data, before_measurement_executor=before_measurement_executor,
after_measurement_executor=after_measurement_executor,
initialization_executor=initialization_executor,
finalization_executor=finalization_executor, data_validator=validate_data, settings=settings,
before_move_executor=before_move_executor, after_move_executor=after_move_executor)
return scanner
def _initialize_epics_dal(writables, readables, conditions, settings):
epics_writer = None
if writables:
epics_writables = [x for x in writables if isinstance(x, EPICS_PV)]
if epics_writables:
# Instantiate the PVs to move the motors.
epics_writer = EPICS_WRITER(pv_names=[pv.pv_name for pv in epics_writables],
readback_pv_names=[pv.readback_pv_name for pv in epics_writables],
tolerances=[pv.tolerance for pv in epics_writables],
timeout=settings.write_timeout)
epics_readables_pv_names = [x.pv_name for x in filter(lambda x: isinstance(x, EPICS_PV), readables)]
epics_conditions_pv_names = [x.pv_name for x in filter(lambda x: isinstance(x, EPICS_CONDITION), conditions)]
# Reading epics PV values.
epics_pv_reader = None
if epics_readables_pv_names:
epics_pv_reader = EPICS_READER(pv_names=epics_readables_pv_names)
# Reading epics condition values.
epics_condition_reader = None
if epics_conditions_pv_names:
epics_condition_reader = EPICS_READER(pv_names=epics_conditions_pv_names)
return epics_writer, epics_pv_reader, epics_condition_reader
def _initialize_bs_dal(readables, conditions, filter_function, positioner):
bs_readables = [x for x in filter(lambda x: isinstance(x, BS_PROPERTY), readables)]
bs_conditions = [x for x in filter(lambda x: isinstance(x, BS_CONDITION), conditions)]
bs_reader = None
if bs_readables or bs_conditions:
# TODO: The scanner should not depend on a particular positioner. Refactor.
if isinstance(positioner, BsreadPositioner):
bs_reader = bsread_dal.ImmediateReadGroupInterface(properties=bs_readables,
conditions=bs_conditions,
filter_function=filter_function)
positioner.set_bs_reader(bs_reader)
return bs_reader
else:
bs_reader = BS_READER(properties=bs_readables, conditions=bs_conditions, filter_function=filter_function)
return bs_reader
def _initialize_function_dal(writables, readables, conditions):
function_writer = FunctionProxy([x for x in writables if isinstance(x, FUNCTION_VALUE)])
function_reader = FunctionProxy([x for x in readables if isinstance(x, FUNCTION_VALUE)])
function_condition = FunctionProxy([x for x in conditions if isinstance(x, FUNCTION_CONDITION)])
return function_writer, function_reader, function_condition

View File

@@ -0,0 +1,58 @@
from collections import namedtuple
from pyscan import config, convert_input
from pyscan.scan import EPICS_WRITER, EPICS_READER
from pyscan.scan_parameters import epics_pv
from pyscan.utils import convert_to_list
SET_EPICS_PV = namedtuple("SET_EPICS_PV", ["pv_name", "value", "readback_pv_name", "tolerance", "timeout"])
RESTORE_WRITABLE_PVS = namedtuple("RESTORE_WRITABLE_PVS", [])
def action_set_epics_pv(pv_name, value, readback_pv_name=None, tolerance=None, timeout=None):
"""
Construct a tuple for set PV representation.
:param pv_name: Name of the PV.
:param value: Value to set the PV to.
:param readback_pv_name: Name of the readback PV.
:param tolerance: Tolerance if the PV is writable.
:param timeout: Timeout for setting the pv value.
:return: Tuple of (pv_name, pv_readback, tolerance)
"""
_, pv_name, readback_pv_name, tolerance, readback_pv_value = epics_pv(pv_name, readback_pv_name, tolerance)
if value is None:
raise ValueError("pv value not specified.")
if not timeout or timeout < 0:
timeout = config.epics_default_set_and_match_timeout
def execute():
writer = EPICS_WRITER(pv_name, readback_pv_name, tolerance, timeout)
writer.set_and_match(value)
writer.close()
return execute
def action_restore(writables):
"""
Restore the initial state of the writable PVs.
:return: Empty tuple, to be replaced with the initial values.
"""
writables = convert_input(convert_to_list(writables))
pv_names = [pv.pv_name for pv in writables]
readback_pv_names = [pv.readback_pv_name for pv in writables]
tolerances = [pv.tolerance for pv in writables]
# Get the initial values.
reader = EPICS_READER(pv_names)
initial_values = reader.read()
reader.close()
def execute():
writer = EPICS_WRITER(pv_names, readback_pv_names, tolerances)
writer.set_and_match(initial_values)
writer.close()
return execute

View File

@@ -0,0 +1,280 @@
from collections import namedtuple
from enum import Enum
from pyscan import config
EPICS_PV = namedtuple("EPICS_PV", ["identifier", "pv_name", "readback_pv_name", "tolerance", "readback_pv_value"])
EPICS_CONDITION = namedtuple("EPICS_CONDITION", ["identifier", "pv_name", "value", "action", "tolerance", "operation"])
BS_PROPERTY = namedtuple("BS_PROPERTY", ["identifier", "property", "default_value"])
BS_CONDITION = namedtuple("BS_CONDITION", ["identifier", "property", "value", "action", "tolerance", "operation",
"default_value"])
SCAN_SETTINGS = namedtuple("SCAN_SETTINGS", ["measurement_interval", "n_measurements",
"write_timeout", "settling_time", "progress_callback", "bs_read_filter"])
FUNCTION_VALUE = namedtuple("FUNCTION_VALUE", ["identifier", "call_function"])
FUNCTION_CONDITION = namedtuple("FUNCTION_CONDITION", ["identifier", "call_function", "action"])
class ConditionComparison(Enum):
EQUAL = 0
NOT_EQUAL = 1
LOWER = 2
LOWER_OR_EQUAL = 3
HIGHER = 4
HIGHER_OR_EQUAL = 5
class ConditionAction(Enum):
Abort = 1
Retry = 2
# Used to determine if a parameter was passed or the default value is used.
_default_value_placeholder = object()
def function_value(call_function, name=None):
"""
Construct a tuple for function representation.
:param call_function: Function to invoke.
:param name: Name to assign to this function.
:return: Tuple of ("identifier", "call_function")
"""
# If the name is not specified, use a counter to set the function name.
if not name:
name = "function_%d" % function_value.function_count
function_value.function_count += 1
identifier = name
return FUNCTION_VALUE(identifier, call_function)
function_value.function_count = 0
def function_condition(call_function, name=None, action=None):
"""
Construct a tuple for condition checking function representation.
:param call_function: Function to invoke.
:param name: Name to assign to this function.
:param action: What to do then the return value is False.
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
:return: Tuple of ("identifier", "call_function", "action")
"""
# If the name is not specified, use a counter to set the function name.
if not name:
name = "function_condition_%d" % function_condition.function_count
function_condition.function_count += 1
identifier = name
# The default action is Abort - used for conditions.
if not action:
action = ConditionAction.Abort
return FUNCTION_CONDITION(identifier, call_function, action)
function_condition.function_count = 0
def epics_pv(pv_name, readback_pv_name=None, tolerance=None, readback_pv_value=None):
"""
Construct a tuple for PV representation
:param pv_name: Name of the PV.
:param readback_pv_name: Name of the readback PV.
:param tolerance: Tolerance if the PV is writable.
:param readback_pv_value: If the readback_pv_value is set, the readback is compared against this instead of
comparing it to the setpoint.
:return: Tuple of (identifier, pv_name, pv_readback, tolerance)
"""
identifier = pv_name
if not pv_name:
raise ValueError("pv_name not specified.")
if not readback_pv_name:
readback_pv_name = pv_name
if not tolerance or tolerance < config.max_float_tolerance:
tolerance = config.max_float_tolerance
return EPICS_PV(identifier, pv_name, readback_pv_name, tolerance, readback_pv_value)
def epics_condition(pv_name, value, action=None, tolerance=None, operation=ConditionComparison.EQUAL):
"""
Construct a tuple for an epics condition representation.
:param pv_name: Name of the PV to monitor.
:param value: Value we expect the PV to be in.
:param action: What to do when the condition fails.
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
:param tolerance: Tolerance within which the condition needs to be.
:param operation: How to compare the received value with the expected value.
Allowed values: ConditionComparison.[EQUAL,NOT_EQUAL, LOWER, LOWER_OR_EQUAL, HIGHER, HIGHER_OR_EQUAL]
:return: Tuple of ("pv_name", "value", "action", "tolerance", "timeout", "operation")
"""
identifier = pv_name
if not pv_name:
raise ValueError("pv_name not specified.")
if value is None:
raise ValueError("pv value not specified.")
# the default action is Abort.
if not action:
action = ConditionAction.Abort
if not tolerance or tolerance < config.max_float_tolerance:
tolerance = config.max_float_tolerance
return EPICS_CONDITION(identifier, pv_name, value, action, tolerance, operation)
def bs_property(name, default_value=_default_value_placeholder):
"""
Construct a tuple for bs read property representation.
:param name: Complete property name.
:param default_value: The default value that is assigned to the property if it is missing.
:return: Tuple of ("identifier", "property", "default_value")
"""
identifier = name
if not name:
raise ValueError("name not specified.")
# We need this to allow the user to change the config at runtime.
if default_value is _default_value_placeholder:
default_value = config.bs_default_missing_property_value
return BS_PROPERTY(identifier, name, default_value)
def bs_condition(name, value, action=None, tolerance=None, operation=ConditionComparison.EQUAL,
default_value=_default_value_placeholder):
"""
Construct a tuple for bs condition property representation.
:param name: Complete property name.
:param value: Expected value.
:param action: What to do when the condition fails.
('ConditionAction.Abort' and 'ConditionAction.Retry' supported)
:param tolerance: Tolerance within which the condition needs to be.
:param operation: How to compare the received value with the expected value.
Allowed values: ConditionComparison.[EQUAL,NOT_EQUAL, LOWER, LOWER_OR_EQUAL, HIGHER, HIGHER_OR_EQUAL]
:param default_value: Default value of a condition, if not present in the bs stream.
:return: Tuple of ("identifier", "property", "value", "action", "tolerance", "operation", "default_value")
"""
identifier = name
if not name:
raise ValueError("name not specified.")
if value is None:
raise ValueError("value not specified.")
if not tolerance or tolerance < config.max_float_tolerance:
tolerance = config.max_float_tolerance
if not action:
action = ConditionAction.Abort
# We need this to allow the user to change the config at runtime.
if default_value is _default_value_placeholder:
default_value = config.bs_default_missing_property_value
return BS_CONDITION(identifier, name, value, action, tolerance, operation, default_value)
def scan_settings(measurement_interval=None, n_measurements=None, write_timeout=None, settling_time=None,
progress_callback=None, bs_read_filter=None):
"""
Set the scan settings.
:param measurement_interval: Default 0. Interval between each measurement, in case n_measurements is more than 1.
:param n_measurements: Default 1. How many measurements to make at each position.
:param write_timeout: How much time to wait in seconds for set_and_match operations on epics PVs.
:param settling_time: How much time to wait in seconds after the motors have reached the desired destination.
:param progress_callback: Function to call after each scan step is completed.
Signature: def callback(current_position, total_positions)
:param bs_read_filter: Filter to apply to the bs read receive function, to filter incoming messages.
Signature: def callback(message)
:return: Scan settings named tuple.
"""
if not measurement_interval or measurement_interval < 0:
measurement_interval = config.scan_default_measurement_interval
if not n_measurements or n_measurements < 1:
n_measurements = config.scan_default_n_measurements
if not write_timeout or write_timeout < 0:
write_timeout = config.epics_default_set_and_match_timeout
if not settling_time or settling_time < 0:
settling_time = config.epics_default_settling_time
if not progress_callback:
def default_progress_callback(current_position, total_positions):
completed_percentage = 100.0 * (current_position / total_positions)
print("Scan: %.2f %% completed (%d/%d)" % (completed_percentage, current_position, total_positions))
progress_callback = default_progress_callback
return SCAN_SETTINGS(measurement_interval, n_measurements, write_timeout, settling_time, progress_callback,
bs_read_filter)
def convert_input(input_parameters):
"""
Convert any type of input parameter into appropriate named tuples.
:param input_parameters: Parameter input from the user.
:return: Inputs converted into named tuples.
"""
converted_inputs = []
for input in input_parameters:
# Input already of correct type.
if isinstance(input, (EPICS_PV, BS_PROPERTY, FUNCTION_VALUE)):
converted_inputs.append(input)
# We need to convert it.
elif isinstance(input, str):
# Check if the string is valid.
if not input:
raise ValueError("Input cannot be an empty string.")
if "://" in input:
# Epics PV!
if input.lower().startswith("ca://"):
converted_inputs.append(epics_pv(input[5:]))
# bs_read property.
elif input.lower().startswith("bs://"):
converted_inputs.append(bs_property(input[5:]))
# A new protocol we don't know about?
else:
raise ValueError("Readable %s uses an unexpected protocol. "
"'ca://' and 'bs://' are supported." % input)
# No protocol specified, default is epics.
else:
converted_inputs.append(epics_pv(input))
elif callable(input):
converted_inputs.append(function_value(input))
# Supported named tuples or string, we cannot interpret the rest.
else:
raise ValueError("Input of unexpected type %s. Value: '%s'." % (type(input), input))
return converted_inputs
def convert_conditions(input_conditions):
"""
Convert any type type of condition input parameter into appropriate named tuples.
:param input_conditions: Condition input from the used.
:return: Input conditions converted into named tuples.
"""
converted_inputs = []
for input in input_conditions:
# Input already of correct type.
if isinstance(input, (EPICS_CONDITION, BS_CONDITION, FUNCTION_CONDITION)):
converted_inputs.append(input)
# Function call.
elif callable(input):
converted_inputs.append(function_condition(input))
# Unknown.
else:
raise ValueError("Condition of unexpected type %s. Value: '%s'." % (type(input), input))
return converted_inputs

202
packages/pyscan/scanner.py Normal file
View File

@@ -0,0 +1,202 @@
from itertools import count
from time import sleep
from pyscan import config
from pyscan.scan_parameters import scan_settings
STATUS_INITIALIZED = "INITIALIZED"
STATUS_RUNNING = "RUNNING"
STATUS_FINISHED = "FINISHED"
STATUS_PAUSED = "PAUSED"
STATUS_ABORTED = "ABORTED"
class Scanner(object):
"""
Perform discrete and continues scans.
"""
def __init__(self, positioner, data_processor, reader, writer=None, before_measurement_executor=None,
after_measurement_executor=None, initialization_executor=None, finalization_executor=None,
data_validator=None, settings=None, before_move_executor=None, after_move_executor=None):
"""
Initialize scanner.
:param positioner: Positioner should provide a generator to get the positions to move to.
:param writer: Object that implements the write(position) method and sets the positions.
:param data_processor: How to store and handle the data.
:param reader: Object that implements the read() method to return data to the data_processor.
:param before_measurement_executor: Callbacks executor that executed before measurements.
:param after_measurement_executor: Callbacks executor that executed after measurements.
:param before_move_executor: Callbacks executor that executes before each move.
:param after_move_executor: Callbacks executor that executes after each move.
"""
self.positioner = positioner
self.writer = writer
self.data_processor = data_processor
self.reader = reader
self.before_measurement_executor = before_measurement_executor
self.after_measurement_executor = after_measurement_executor
self.initialization_executor = initialization_executor
self.finalization_executor = finalization_executor
self.settings = settings or scan_settings()
self.before_move_executor = before_move_executor
self.after_move_executor = after_move_executor
# If no data validator is provided, data is always valid.
self.data_validator = data_validator or (lambda position, data: True)
self._user_abort_scan_flag = False
self._user_pause_scan_flag = False
self._status = STATUS_INITIALIZED
def abort_scan(self):
"""
Abort the scan after the next measurement.
"""
self._user_abort_scan_flag = True
def pause_scan(self):
"""
Pause the scan after the next measurement.
"""
self._user_pause_scan_flag = True
def get_status(self):
return self._status
def resume_scan(self):
"""
Resume the scan.
"""
self._user_pause_scan_flag = False
def _verify_scan_status(self):
"""
Check if the conditions to pause or abort the scan are met.
:raise Exception in case the conditions are met.
"""
# Check if the abort flag is set.
if self._user_abort_scan_flag:
self._status = STATUS_ABORTED
raise Exception("User aborted scan.")
# If the scan is in pause, wait until it is resumed or the user aborts the scan.
if self._user_pause_scan_flag:
self._status = STATUS_PAUSED
while self._user_pause_scan_flag:
if self._user_abort_scan_flag:
self._status = STATUS_ABORTED
raise Exception("User aborted scan in pause.")
sleep(config.scan_pause_sleep_interval)
# Once the pause flag is cleared, the scanning continues.
self._status = STATUS_RUNNING
def _perform_single_read(self, current_position_index):
"""
Read a single result from the channel.
:param current_position_index: Current position, passed to the validator.
:return: Single result (all channels).
"""
n_current_acquisition = 0
# Collect data until acquired data is valid or retry limit reached.
while n_current_acquisition < config.scan_acquisition_retry_limit:
retry_acquisition = n_current_acquisition != 0
single_measurement = self.reader(current_position_index, retry=retry_acquisition)
# If the data is valid, break out of the loop.
if self.data_validator(current_position_index, single_measurement):
return single_measurement
n_current_acquisition += 1
sleep(config.scan_acquisition_retry_delay)
# Could not read the data within the retry limit.
else:
raise Exception("Number of maximum read attempts (%d) exceeded. Cannot read valid data at position %s."
% (config.scan_acquisition_retry_limit, current_position_index))
def _read_and_process_data(self, current_position):
"""
Read the data and pass it on only if valid.
:param current_position: Current position reached by the scan.
:return: Current position scan data.
"""
# We do a single acquisition per position.
if self.settings.n_measurements == 1:
result = self._perform_single_read(current_position)
# Multiple acquisitions.
else:
result = []
for n_measurement in range(self.settings.n_measurements):
result.append(self._perform_single_read(current_position))
sleep(self.settings.measurement_interval)
# Process only valid data.
self.data_processor.process(current_position, result)
return result
def discrete_scan(self):
"""
Perform a discrete scan - set a position, read, continue. Return value at the end.
"""
try:
self._status = STATUS_RUNNING
# Get how many positions we have in total.
n_of_positions = sum(1 for _ in self.positioner.get_generator())
# Report the 0% completed.
self.settings.progress_callback(0, n_of_positions)
# Set up the experiment.
if self.initialization_executor:
self.initialization_executor(self)
for position_index, next_positions in zip(count(1), self.positioner.get_generator()):
# Execute before moving to the next position.
if self.before_move_executor:
self.before_move_executor(next_positions)
# Position yourself before reading.
if self.writer:
self.writer(next_positions)
# Settling time, wait after positions has been reached.
sleep(self.settings.settling_time)
# Execute the after move executor.
if self.after_move_executor:
self.after_move_executor(next_positions)
# Pre reading callbacks.
if self.before_measurement_executor:
self.before_measurement_executor(next_positions)
# Read and process the data in the current position.
position_data = self._read_and_process_data(next_positions)
# Post reading callbacks.
if self.after_measurement_executor:
self.after_measurement_executor(next_positions, position_data)
# Report about the progress.
self.settings.progress_callback(position_index, n_of_positions)
# Verify is the scan should continue.
self._verify_scan_status()
finally:
# Clean up after yourself.
if self.finalization_executor:
self.finalization_executor(self)
# If the scan was aborted we do not change the status to finished.
if self._status != STATUS_ABORTED:
self._status = STATUS_FINISHED
return self.data_processor.get_data()
def continuous_scan(self):
# TODO: Needs implementation.
pass

216
packages/pyscan/utils.py Normal file
View File

@@ -0,0 +1,216 @@
import inspect
from collections import OrderedDict
from time import sleep
from epics.pv import PV
from pyscan import config
from pyscan.scan_parameters import convert_input, ConditionComparison
def compare_channel_value(current_value, expected_value, tolerance=0.0, operation=ConditionComparison.EQUAL):
"""
Check if the pv value is the same as the expected value, within tolerance for int and float.
:param current_value: Current value to compare it to.
:param expected_value: Expected value of the PV.
:param tolerance: Tolerance for number comparison. Cannot be less than the minimum tolerance.
:param operation: Operation to perform on the current and expected value - works for int and floats.
:return: True if the value matches.
"""
# Minimum tolerance allowed.
tolerance = max(tolerance, config.max_float_tolerance)
def compare_value(value):
# For numbers we compare them within tolerance.
if isinstance(current_value, (float, int)):
if operation == ConditionComparison.EQUAL:
return abs(current_value - expected_value) <= tolerance
elif operation == ConditionComparison.HIGHER:
return (current_value - expected_value) > tolerance
elif operation == ConditionComparison.HIGHER_OR_EQUAL:
return (current_value - expected_value) >= tolerance
elif operation == ConditionComparison.LOWER:
return (current_value - expected_value) < 0 or abs(current_value - expected_value) < tolerance
elif operation == ConditionComparison.LOWER_OR_EQUAL:
return (current_value - expected_value) <= 0 or abs(current_value - expected_value) <= tolerance
elif operation == ConditionComparison.NOT_EQUAL:
return abs(current_value - expected_value) > tolerance
# Otherwise use the object comparison.
else:
try:
if operation == ConditionComparison.EQUAL:
return current_value == expected_value
elif operation == ConditionComparison.HIGHER:
return current_value > expected_value
elif operation == ConditionComparison.HIGHER_OR_EQUAL:
return current_value >= expected_value
elif operation == ConditionComparison.LOWER:
return current_value < expected_value
elif operation == ConditionComparison.LOWER_OR_EQUAL:
return current_value <= expected_value
elif operation == ConditionComparison.NOT_EQUAL:
return current_value != expected_value
except:
raise ValueError("Do not know how to compare current_value %s with expected_value %s and action %s."
% (current_value, expected_value, operation))
return False
if isinstance(current_value, list):
# In case of a list, any of the provided values will do.
return any((compare_value(value) for value in expected_value))
else:
return compare_value(current_value)
def connect_to_pv(pv_name, n_connection_attempts=3):
"""
Start a connection to a PV.
:param pv_name: PV name to connect to.
:param n_connection_attempts: How many times you should try to connect before raising an exception.
:return: PV object.
:raises ValueError if cannot connect to PV.
"""
pv = PV(pv_name, auto_monitor=False)
for i in range(n_connection_attempts):
if pv.connect():
return pv
sleep(0.1)
raise ValueError("Cannot connect to PV '%s'." % pv_name)
def validate_lists_length(*args):
"""
Check if all the provided lists are of the same length.
:param args: Lists.
:raise ValueError if they are not of the same length.
"""
if not args:
raise ValueError("Cannot compare lengths of None.")
initial_length = len(args[0])
if not all([len(element) == initial_length for element in args]):
error = "The provided lists must be of same length.\n"
for element in args:
error += "%s\n" % element
raise ValueError(error)
def convert_to_list(value):
"""
If the input parameter is not a list, convert to one.
:return: The value in a list, or None.
"""
# If None or a list, just return the value as it is.
if (value is None) or isinstance(value, list):
return value
# Otherwise treat the value as the first element in a list.
return [value]
def convert_to_position_list(axis_list):
"""
# Change the PER KNOB to PER INDEX of positions.
:param axis_list: PER KNOB list of positions.
:return: PER INDEX list of positions.
"""
return [list(positions) for positions in zip(*axis_list)]
def flat_list_generator(list_to_flatten):
# Just return the most inner list.
if (len(list_to_flatten) == 0) or (not isinstance(list_to_flatten[0], list)):
yield list_to_flatten
# Otherwise we have to go deeper.
else:
for inner_list in list_to_flatten:
yield from flat_list_generator(inner_list)
class ActionExecutor(object):
"""
Execute all callbacks in the same thread.
Each callback method should accept 2 parameters: position, sampled values.
"""
def __init__(self, actions):
"""
Initialize the action executor.
:param actions: Actions to execute. Single action or list of.
"""
self.actions = convert_to_list(actions)
def execute(self, position, position_data=None):
for action in self.actions:
n_parameters = len(inspect.signature(action).parameters)
if n_parameters == 2:
action(position, position_data)
elif n_parameters == 1:
action(position)
else:
action()
class SimpleDataProcessor(object):
"""
Save the position and the received data at this position.
"""
def __init__(self, positions=None, data=None):
"""
Initialize the simple data processor.
:param positions: List to store the visited positions. Default: internal list.
:param data: List to store the data at each position. Default: internal list.
"""
self.positions = positions if positions is not None else []
self.data = data if data is not None else []
def process(self, position, data):
self.positions.append(position)
self.data.append(data)
def get_data(self):
return self.data
def get_positions(self):
return self.positions
class DictionaryDataProcessor(SimpleDataProcessor):
"""
Save the positions and the received data for each position in a dictionary.
"""
def __init__(self, readables, positions=None, data=None):
"""
Readables specified in the scan.
:param readables: Same readables that were passed to the scan function.
"""
super(DictionaryDataProcessor, self).__init__(positions=positions, data=data)
readables = convert_input(readables)
self.readable_ids = [x.identifier for x in readables]
def process(self, position, data):
self.positions.append(position)
# Create a dictionary with the results.
values = OrderedDict(zip(self.readable_ids, data))
self.data.append(values)

View File

@@ -0,0 +1 @@
./pyscan-2.8.0-py3.7.egg