setting up xtreme plugins

This commit is contained in:
2023-09-01 10:41:40 +02:00
parent 07aa7697eb
commit d2e5f31bfa
19 changed files with 764 additions and 91 deletions
+1
View File
@@ -0,0 +1 @@
from .bec_client import *
+1
View File
@@ -0,0 +1 @@
from .plugins import *
+245
View File
@@ -0,0 +1,245 @@
from bec_client.scan_manager import ScanReport
from bec_utils.devicemanager import Device
# pylint:disable=undefined-variable
# pylint: disable=too-many-arguments
def dscan(
motor1: Device, m1_from: float, m1_to: float, steps: int, exp_time: float, **kwargs
) -> ScanReport:
"""Relative line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position relative to the current position.
m1_to (float): End position relative to the current position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> dscan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=True, **kwargs
)
def d2scan(
motor1: Device,
m1_from: float,
m1_to: float,
motor2: Device,
m2_from: float,
m2_to: float,
steps: int,
exp_time: float,
**kwargs
) -> ScanReport:
"""Relative line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> d2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=True,
**kwargs
)
def ascan(motor1, m1_from, m1_to, steps, exp_time, **kwargs):
"""Absolute line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position.
m1_to (float): End position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> ascan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=False, **kwargs
)
def a2scan(motor1, m1_from, m1_to, motor2, m2_from, m2_to, steps, exp_time, **kwargs):
"""Absolute line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> a2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=False,
**kwargs
)
def dmesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Relative mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> dmesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=True,
)
def amesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Absolute mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> amesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=False,
)
def umv(*args) -> ScanReport:
"""Updated absolute move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umv(dev.samx, 1)
>>> umv(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=False)
def umvr(*args) -> ScanReport:
"""Updated relative move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umvr(dev.samx, 1)
>>> umvr(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=True)
def mv(*args) -> ScanReport:
"""Absolute move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mv(dev.samx, 1)
>>> mv(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=False)
def mvr(*args) -> ScanReport:
"""Relative move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mvr(dev.samx, 1)
>>> mvr(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=True)
@@ -0,0 +1,42 @@
"""
Post startup script for the BEC client. This script is executed after the
IPython shell is started. It is used to load the beamline specific
information and to setup the prompts.
The script is executed in the global namespace of the IPython shell. This
means that all variables defined here are available in the shell.
If needed, bec command-line arguments can be parsed here. For example, to
parse the --session argument, add the following lines to the script:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--session", help="Session name", type=str, default="my_default_session")
args = parser.parse_args()
if args.session == "my_session":
print("Loading my_session session")
from bec_plugins.bec_client.plugins.my_session import *
else:
print("Loading default session")
from bec_plugins.bec_client.plugins.default_session import *
"""
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
from bec_lib.core import bec_logger
logger = bec_logger.logger
logger.info("Using the XTreme startup script.")
# SETUP BEAMLINE INFO
from bec_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
bec._beamline_mixin._bl_info_register(SLSInfo)
bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS
bec._ip.prompts.username = "XTreme"
bec._ip.prompts.status = 1
@@ -0,0 +1,24 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to set up the BEC client configuration. The script is
executed in the global namespace of the BEC client. This means that all
variables defined here are available in the BEC client.
To set up the BEC client configuration, use the ServiceConfig class. For example,
to set the configuration file path, add the following lines to the script:
import pathlib
from bec_lib.core import ServiceConfig
current_path = pathlib.Path(__file__).parent.resolve()
CONFIG_PATH = f"{current_path}/<path_to_my_config_file.yaml>"
config = ServiceConfig(CONFIG_PATH)
If this startup script defined a ServiceConfig object, the BEC client will use
it to configure itself. Otherwise, the BEC client will use the default config.
"""
CONFIG_PATH = "/sls/X07MA/data/x07maop/SW/bec/bec/bec_client_config.yaml"
config = ServiceConfig(CONFIG_PATH)
+11
View File
@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
os.environ["EPICS_CA_ADDR_LIST"] = "129.129.112.255 sls-x07ma-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()
View File
@@ -0,0 +1,176 @@
import time
import numpy as np
from bec_lib.core import bec_logger
from scan_server.scans import FlyScanBase, ScanArgType, ScanBase
logger = bec_logger.logger
class OTFScan(FlyScanBase):
scan_name = "otf_scan"
scan_report_hint = "table"
required_kwargs = ["e1", "e2", "time"]
arg_input = []
arg_bundle_size = len(arg_input)
def __init__(self, *args, parameter=None, **kwargs):
"""Scans the energy from e1 to e2 in <time> minutes.
Examples:
>>> scans.otf_scan(e1=700, e2=740, time=4)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = []
self.num_pos = 0
self.mono = self.caller_kwargs.get("mono", "mono")
self.otf_device = self.caller_kwargs.get("otf", "otf")
def scan_core(self):
yield from self.stubs.set(
device=self.mono, value=self.caller_kwargs["e1"], wait_group="flyer"
)
yield from self.stubs.wait(device=[self.mono], wait_group="flyer", wait_type="move")
yield from self.stubs.kickoff(
device=self.otf_device,
parameter={
key: val for key, val in self.caller_kwargs.items() if key in ["e1", "e2", "time"]
},
)
yield from self.stubs.wait(device=[self.otf_device], wait_group="kickoff", wait_type="move")
yield from self.stubs.complete(device=self.otf_device)
target_diid = self.DIID - 1
while True:
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
status = self.stubs.get_req_status(
device=self.otf_device, RID=self.metadata["RID"], DIID=target_diid
)
progress = self.stubs.get_device_progress(
device=self.otf_device, RID=self.metadata["RID"]
)
if progress:
self.num_pos = progress
if status:
break
time.sleep(1)
class HystScan(ScanBase):
scan_name = "hyst_scan"
scan_report_hint = "table"
required_kwargs = []
arg_input = [
ScanArgType.DEVICE,
ScanArgType.FLOAT,
ScanArgType.FLOAT,
ScanArgType.DEVICE,
ScanArgType.FLOAT,
ScanArgType.FLOAT,
]
arg_bundle_size = 3
scan_type = "step"
default_ramp_rate = 2
def __init__(self, *args, parameter=None, **kwargs):
"""
A hysteresis scan.
scans.hyst_scan(field_motor, start_field, end_field, mono, energy1, energy2)
Examples:
>>> scans.hyst_scan(dev.field_x, 0, 0.5, dev.mono, 600, 640, ramp_rate=2)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.flyer = list(self.caller_args.keys())[0]
self.energy_motor = list(self.caller_args.keys())[1]
self.scan_motors = [self.energy_motor, self.flyer]
self.flyer_positions = self.caller_args[self.flyer]
self._current_scan_motor_index = 0
self._scan_motor_direction = 1
self.ramp_rate = self.caller_kwargs.get("ramp_rate", self.default_ramp_rate)
def _calculate_positions(self) -> None:
self.positions = [[pos] for pos in self.caller_args[self.energy_motor]]
def prepare_positions(self):
self._calculate_positions()
self.num_pos = 0
yield None
self._check_limits()
def _at_each_point(self):
yield from self.stubs.read(group="primary", wait_group="primary", pointID=self.pointID)
self.pointID += 1
def _get_next_scan_motor_position(self):
while True:
yield self.positions[self._current_scan_motor_index][0]
if len(self.positions) - 1 == self._current_scan_motor_index or (
self._current_scan_motor_index == 0 and self._scan_motor_direction < 0
):
self._scan_motor_direction *= -1
self._current_scan_motor_index += self._scan_motor_direction
def scan_core(self):
# yield from self._move_and_wait(self.positions[0])
status = yield from self.stubs.send_rpc_and_wait(
"field_x", "ramprate.set", self.default_ramp_rate
)
status.wait()
yield from self.stubs.set(
device=self.flyer, value=self.flyer_positions[0], wait_group="flyer"
)
yield from self.stubs.wait(device=[self.flyer], wait_group="flyer", wait_type="move")
status = yield from self.stubs.send_rpc_and_wait("field_x", "ramprate.set", self.ramp_rate)
status.wait()
# send the slow motor on its way
yield from self.stubs.set(
device=self.flyer,
value=self.flyer_positions[1],
wait_group="flyer",
)
flyer_done = False
pos_generator = self._get_next_scan_motor_position()
target_DIID = self.DIID - 1
dev = self.device_manager.devices
while not flyer_done:
flyer_done = bool(
self.stubs.get_req_status(
device=self.flyer, DIID=target_DIID, RID=self.metadata["RID"]
)
)
val = next(pos_generator)
logger.info(f"Moving mono to {val}.")
yield from self.stubs.set(device=self.energy_motor, value=val, wait_group="mono")
yield from self.stubs.wait(
device=[self.energy_motor], wait_group="mono", wait_type="move"
)
monitored_devices = [dev.name for dev in dev.monitored_devices([])]
yield from self.stubs.read_and_wait(
device=[self.flyer, self.scan_motors[0], *monitored_devices],
wait_group="readout_primary",
pointID=self.pointID,
)
# time.sleep(1)
self.pointID += 1
self.scan_motors[0]
self.num_pos += 1
status = yield from self.stubs.send_rpc_and_wait(
"field_x", "ramprate.set", self.default_ramp_rate
)
status.wait()
def return_to_start(self):
yield None