0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

fix: adapt code to BEC 1.0

This commit is contained in:
2024-02-07 17:14:33 +01:00
parent a7bfcc12b9
commit b36131eed5
10 changed files with 1001 additions and 24 deletions

View File

@ -102,7 +102,7 @@ class StreamApp(QWidget):
@staticmethod
def _streamer_cb(msg, *, parent, **_kwargs) -> None:
msgMCS = messages.DeviceMessage.loads(msg.value)
msgMCS = msg.value
print(msgMCS)
row = msgMCS.content["signals"][parent.sub_device]
metadata = msgMCS.metadata
@ -123,7 +123,7 @@ class StreamApp(QWidget):
def _device_cv(msg, *, parent, **_kwargs) -> None:
print("Getting ScanID")
msgDEV = messages.ScanStatusMessage.loads(msg.value)
msgDEV = msg.value
current_scanID = msgDEV.content["scanID"]

View File

@ -1297,7 +1297,7 @@ class MotorControl(QThread):
@staticmethod
def _device_status_callback_motors(msg, *, parent, **_kwargs) -> None:
deviceMSG = messages.DeviceMessage.loads(msg.value)
deviceMSG = msg.value
if parent.motor_x.name in deviceMSG.content["signals"]:
parent.current_x = deviceMSG.content["signals"][parent.motor_x.name]["value"]
elif parent.motor_y.name in deviceMSG.content["signals"]:

View File

@ -271,7 +271,7 @@ class StreamPlot(QtWidgets.QWidget):
continue
endpoint = f"px_stream/projection_{self._current_proj}/data"
msgs = self.client.producer.lrange(topic=endpoint, start=-1, end=-1)
data = [messages.DeviceMessage.loads(msg) for msg in msgs]
data = msgs
if not data:
continue
with np.errstate(divide="ignore", invalid="ignore"):

View File

@ -10,7 +10,7 @@ import os
from collections.abc import Callable
from typing import Union
from bec_lib import BECClient, messages, ServiceConfig
from bec_lib import BECClient, ServiceConfig
from bec_lib.redis_connector import RedisConsumerThreaded
from qtpy.QtCore import QObject, Signal as pyqtSignal
@ -76,7 +76,7 @@ class _BECDispatcher(QObject):
"""Creates a new connection for given topics."""
def cb(msg):
msg = messages.MessageReader.loads(msg.value)
msg = msg.value
if not isinstance(msg, list):
msg = [msg]
for msg_i in msg:

View File

@ -119,8 +119,7 @@ class ScanControl(QWidget):
def populate_scans(self):
"""Populates the scan selection combo box with available scans"""
msg = self.client.producer.get(MessageEndpoints.available_scans())
self.available_scans = msgpack.loads(msg)
self.available_scans = self.client.producer.get(MessageEndpoints.available_scans()).resource
if self.allowed_scans is None:
allowed_scans = self.available_scans.keys()
else:

0
tests/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,989 @@
from bec_lib.messages import AvailableResourceMessage
available_scans_message = AvailableResourceMessage(
resource={
"acquire": {
"class": "Acquire",
"base_class": "ScanBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": "\n A simple acquisition at the current position.\n\n Args:\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.acquire(exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"interactive_scan_trigger": {
"class": "AddInteractiveScanPoint",
"base_class": "ScanComponent",
"arg_input": {"device": "device"},
"required_kwargs": ["required"],
"arg_bundle_size": {"bundle": 1, "min": 1, "max": None},
"scan_report_hint": "",
"doc": "\n An interactive scan for one or more motors.\n\n Args:\n *args: devices\n exp_time: exposure time in s\n steps: number of steps (please note: 5 steps == 6 positions)\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.interactive_scan_trigger()\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"close_interactive_scan": {
"class": "CloseInteractiveScan",
"base_class": "ScanComponent",
"arg_input": {},
"required_kwargs": ["required"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "",
"doc": "\n An interactive scan for one or more motors.\n\n Args:\n *args: devices\n exp_time: exposure time in s\n steps: number of steps (please note: 5 steps == 6 positions)\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.close_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"close_scan_def": {
"class": "CloseScanDef",
"base_class": "RequestBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": 0, "max": 0},
"scan_report_hint": "table",
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"close_scan_group": {
"class": "CloseScanGroup",
"base_class": "RequestBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": 0, "max": 0},
"scan_report_hint": None,
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"cont_line_scan": {
"class": "ContLineScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["steps", "relative"],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A line scan for one or more motors.\n\n Args:\n *args (Device, float, float): pairs of device / start position / end position\n exp_time (float): exposure time in seconds. Default is 0.\n steps (int): number of steps. Default is 10.\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n offset (float): offset in motor units. Default is 100.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.cont_line_scan(dev.motor1, -5, 5, steps=10, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{"name": "offset", "kind": "KEYWORD_ONLY", "default": 100, "annotation": "float"},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"device_rpc": {
"class": "DeviceRPC",
"base_class": "RequestBase",
"arg_input": ["device", "str", "list", "dict"],
"required_kwargs": [],
"arg_bundle_size": {"bundle": 4, "min": 1, "max": 1},
"scan_report_hint": None,
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"fermat_scan": {
"class": "FermatSpiralScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["step", "relative"],
"arg_bundle_size": {"bundle": 3, "min": 2, "max": 2},
"scan_report_hint": "table",
"doc": '\n A scan following Fermat\'s spiral.\n\n Args:\n *args: pairs of device / start position / end position arguments\n step (float): step size in motor units. Default is 0.1.\n exp_time (float): exposure time in seconds. Default is 0.\n settling_time (float): settling time in seconds. Default is 0.\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n spiral_type (float): type of spiral to use. Default is 0.\n optim_trajectory (str): trajectory optimization method. Default is None. Options are "corridor" and "none".\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.fermat_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, step=0.5, exp_time=0.1, relative=True, optim_trajectory="corridor")\n\n ',
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "step", "kind": "KEYWORD_ONLY", "default": 0.1, "annotation": "float"},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "settling_time",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": "float",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "spiral_type",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": "float",
},
{
"name": "optim_trajectory",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": {"Literal": ["corridor", None]},
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"line_scan": {
"class": "LineScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["steps", "relative"],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
"scan_report_hint": "table",
"doc": "\n A line scan for one or more motors.\n\n Args:\n *args (Device, float, float): pairs of device / start position / end position\n exp_time (float): exposure time in s. Default: 0\n steps (int): number of steps. Default: 10\n relative (bool): if True, the start and end positions are relative to the current position. Default: False\n burst_at_each_point (int): number of acquisition per point. Default: 1\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{"name": "steps", "kind": "KEYWORD_ONLY", "default": None, "annotation": "int"},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"list_scan": {
"class": "ListScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "positions": "list"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 2, "min": 1, "max": None},
"scan_report_hint": "table",
"doc": "\n A scan following the positions specified in a list.\n Please note that all lists must be of equal length.\n\n Args:\n *args: pairs of motors and position lists\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.list_scan(dev.motor1, [0,1,2,3,4], dev.motor2, [4,3,2,1,0], exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"monitor_scan": {
"class": "MonitorScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n Readout all primary devices at each update of the monitored device.\n\n Args:\n device (Device): monitored device\n start (float): start position of the monitored device\n stop (float): stop position of the monitored device\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.monitor_scan(dev.motor1, -5, 5, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "device",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "start",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "stop",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"mv": {
"class": "Move",
"base_class": "RequestBase",
"arg_input": {"device": "device", "target": "float"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 2, "min": 1, "max": None},
"scan_report_hint": None,
"doc": "\n Move device(s) to an absolute position\n Args:\n *args (Device, float): pairs of device / position arguments\n relative (bool): if True, move relative to current position\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.mv(dev.samx, 1, dev.samy,2)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"open_interactive_scan": {
"class": "OpenInteractiveScan",
"base_class": "ScanComponent",
"arg_input": {"device": "device"},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 1, "min": 1, "max": None},
"scan_report_hint": "",
"doc": "\n An interactive scan for one or more motors.\n\n Args:\n *args: devices\n exp_time: exposure time in s\n steps: number of steps (please note: 5 steps == 6 positions)\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.open_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"open_scan_def": {
"class": "OpenScanDef",
"base_class": "RequestBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": 0, "max": 0},
"scan_report_hint": None,
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"round_roi_scan": {
"class": "RoundROIScan",
"base_class": "ScanBase",
"arg_input": {
"motor_1": "device",
"motor_2": "device",
"width_1": "float",
"width_2": "float",
},
"required_kwargs": ["dr", "nth", "relative"],
"arg_bundle_size": {"bundle": 4, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A scan following a round-roi-like pattern.\n\n Args:\n *args: motor1, width for motor1, motor2, width for motor2,\n dr (float): shell width. Default is 1.\n nth (int): number of points in the first shell. Default is 5.\n exp_time (float): exposure time in seconds. Default is 0.\n relative (bool): Start from an absolute or relative position. Default is False.\n burst_at_each_point (int): number of acquisition per point. Default is 1.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.round_roi_scan(dev.motor1, 20, dev.motor2, 20, dr=2, nth=3, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "dr", "kind": "KEYWORD_ONLY", "default": 1, "annotation": "float"},
{"name": "nth", "kind": "KEYWORD_ONLY", "default": 5, "annotation": "int"},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"round_scan": {
"class": "RoundScan",
"base_class": "ScanBase",
"arg_input": {
"motor_1": "device",
"motor_2": "device",
"inner_ring": "float",
"outer_ring": "float",
"number_of_rings": "int",
"number_of_positions_in_first_ring": "int",
},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 6, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A scan following a round shell-like pattern.\n\n Args:\n *args: motor1, motor2, inner ring, outer ring, number of rings, number of positions in the first ring\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.round_scan(dev.motor1, dev.motor2, 0, 25, 5, 3, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"round_scan_fly": {
"class": "RoundScanFlySim",
"base_class": "ScanBase",
"arg_input": {
"flyer": "device",
"inner_ring": "float",
"outer_ring": "float",
"number_of_rings": "int",
"number_of_positions_in_first_ring": "int",
},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 5, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A fly scan following a round shell-like pattern.\n\n Args:\n *args: motor1, motor2, inner ring, outer ring, number of rings, number of positions in the first ring\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.round_scan_fly(dev.flyer_sim, 0, 50, 5, 3, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"grid_scan": {
"class": "Scan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float", "steps": "int"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 4, "min": 2, "max": None},
"scan_report_hint": "table",
"doc": "\n Scan two motors in a grid.\n\n Args:\n *args (Device, float, float, int): pairs of device / start / stop / steps arguments\n exp_time (float): exposure time in seconds. Default is 0.\n settling_time (float): settling time in seconds. Default is 0.\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.grid_scan(dev.motor1, -5, 5, 10, dev.motor2, -5, 5, 10, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "settling_time",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": "float",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"time_scan": {
"class": "TimeScan",
"base_class": "ScanBase",
"arg_input": {},
"required_kwargs": ["points", "interval"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": '\n Trigger and readout devices at a fixed interval.\n Note that the interval time cannot be less than the exposure time.\n The effective "sleep" time between points is\n sleep_time = interval - exp_time\n\n Args:\n points: number of points\n interval: time interval between points\n exp_time: exposure time in s\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.time_scan(points=10, interval=1.5, exp_time=0.1, relative=True)\n\n ',
"signature": [
{
"name": "points",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "interval",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "exp_time",
"kind": "POSITIONAL_OR_KEYWORD",
"default": 0,
"annotation": "float",
},
{
"name": "burst_at_each_point",
"kind": "POSITIONAL_OR_KEYWORD",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"umv": {
"class": "UpdatedMove",
"base_class": "RequestBase",
"arg_input": {"device": "device", "target": "float"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 2, "min": 1, "max": None},
"scan_report_hint": "readback",
"doc": "\n Move device(s) to an absolute position and show live updates. This is a blocking call. For non-blocking use Move.\n Args:\n *args (Device, float): pairs of device / position arguments\n relative (bool): if True, move relative to current position\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.umv(dev.samx, 1, dev.samy,2)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"lamni_fermat_scan": {
"class": "LamNIFermatScan",
"base_class": "ScanBase",
"arg_input": {},
"required_kwargs": ["fov_size", "exp_time", "step", "angle"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": "\n A LamNI scan following Fermat's spiral.\n\n Kwargs:\n fov_size [um]: Fov in the piezo plane (i.e. piezo range). Max 80 um\n step [um]: stepsize\n shift_x/y [mm]: extra shift in x/y. The shift is directly applied to the scan. It will not be auto rotated. (default 0).\n center_x/center_y [mm]: center position in x/y at 0 deg. This shift is rotated\n using the geometry of LamNI\n It is determined by the first 'click' in the x-ray eye alignemnt procedure\n angle [deg]: rotation angle (will rotate first)\n scan_type: fly (i.e. HW triggered step in case of LamNI) or step\n stitch_x/y: shift scan to adjacent stitch region\n fov_circular [um]: generate a circular field of view in the sample plane. This is an additional cropping to fov_size.\n stitch_overlap [um]: overlap of the stitched regions\n Returns:\n\n Examples:\n >>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)\n >>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"lamni_move_to_scan_center": {
"class": "LamNIMoveToScanCenter",
"base_class": "RequestBase",
"arg_input": {"shift_x": "float", "shift_y": "float", "angle": "float"},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": None,
"doc": "\n Move LamNI to a new scan center.\n\n Args:\n *args: shift x, shift y, tomo angle in deg\n\n Examples:\n >>> scans.lamni_move_to_scan_center(1.2, 2.8, 12.5)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"owis_grid": {
"class": "OwisGrid",
"base_class": "FlyScanBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "scan_progress",
"doc": "Owis-based grid scan.",
"signature": [
{
"name": "start_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "start_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0.1, "annotation": "float"},
{
"name": "readout_time",
"kind": "KEYWORD_ONLY",
"default": 0.003,
"annotation": "float",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"sgalil_grid": {
"class": "SgalilGrid",
"base_class": "FlyScanBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "scan_progress",
"doc": "\n SGalil-based grid scan.\n\n Args:\n start_y (float): start position of y axis (fast axis)\n end_y (float): end position of y axis (fast axis)\n interval_y (int): number of points in y axis\n start_x (float): start position of x axis (slow axis)\n end_x (float): end position of x axis (slow axis)\n interval_x (int): number of points in x axis\n exp_time (float): exposure time in seconds. Default is 0.1s\n readout_time (float): readout time in seconds, minimum of 3e-3s (3ms)\n\n Exp:\n scans.sgalil_grid(start_y = val1, end_y= val1, interval_y = val1, start_x = val1, end_x = val1, interval_x = val1, exp_time = 0.02, readout_time = 3e-3)\n\n\n ",
"signature": [
{
"name": "start_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "start_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0.1, "annotation": "float"},
{
"name": "readout_time",
"kind": "KEYWORD_ONLY",
"default": 0.1,
"annotation": "float",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"hyst_scan": {
"class": "HystScan",
"base_class": "ScanBase",
"arg_input": {
"field_motor": "device",
"start_field": "float",
"end_field": "float",
"mono": "device",
"energy1": "float",
"energy2": "float",
},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A hysteresis scan.\n\n scans.hyst_scan(field_motor, start_field, end_field, mono, energy1, energy2)\n\n Examples:\n >>> scans.hyst_scan(dev.field_x, 0, 0.5, dev.mono, 600, 640, ramp_rate=2)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"otf_scan": {
"class": "OTFScan",
"base_class": "FlyScanBase",
"arg_input": {},
"required_kwargs": ["e1", "e2", "time"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": "Scans the energy from e1 to e2 in <time> minutes.\n\n Examples:\n >>> scans.otf_scan(e1=700, e2=740, time=4)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
}
)

Binary file not shown.

View File

@ -2,24 +2,13 @@ import os
import pickle
from unittest.mock import MagicMock
import msgpack
import pytest
from qtpy.QtWidgets import QLineEdit
from bec_widgets.widgets import ScanControl
from bec_widgets.utils.widget_io import WidgetIO
# TODO there has to be a better way to mock messages than this, in this case I just took the msg from bec
def load_test_msg(msg_name):
"""Helper function to load msg from pickle file."""
msg_path = os.path.join(os.path.dirname(__file__), "test_msgs", f"{msg_name}.pkl")
with open(msg_path, "rb") as f:
msg = pickle.load(f)
return msg
packed_message = load_test_msg("msg_dict")["available_scans"]
from .test_msgs.available_scans_message import available_scans_message
class FakePositioner:
@ -45,7 +34,7 @@ def mocked_client():
client = MagicMock()
# Mock the producer.get method to return the packed message
client.producer.get.return_value = packed_message
client.producer.get.return_value = available_scans_message
# # Mock the device_manager.devices attribute to return a mock object for samx
client.device_manager.devices = MagicMock()
@ -66,7 +55,7 @@ def scan_control(qtbot, mocked_client): # , mock_dev):
def test_populate_scans(scan_control, mocked_client):
# The comboBox should be populated with all scan from the message right after initialization
expected_scans = msgpack.loads(packed_message).keys()
expected_scans = available_scans_message.resource.keys()
assert scan_control.comboBox_scan_selection.count() == len(expected_scans)
for scan in expected_scans: # Each scan should be in the comboBox
assert scan_control.comboBox_scan_selection.findText(scan) != -1
@ -77,7 +66,7 @@ def test_populate_scans(scan_control, mocked_client):
) # TODO now only for line_scan and grid_scan, later for all loaded scans
def test_on_scan_selected(scan_control, scan_name):
# Expected scan info from the message signature
expected_scan_info = msgpack.loads(packed_message)[scan_name]
expected_scan_info = available_scans_message.resource[scan_name]
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
@ -110,7 +99,7 @@ def test_on_scan_selected(scan_control, scan_name):
@pytest.mark.parametrize("scan_name", ["line_scan", "grid_scan"])
def test_add_remove_bundle(scan_control, scan_name):
# Expected scan info from the message signature
expected_scan_info = msgpack.loads(packed_message)[scan_name]
expected_scan_info = available_scans_message.resource[scan_name]
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)