1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-09 18:20:55 +02:00

Compare commits

..

1 Commits

27 changed files with 183 additions and 3120 deletions

View File

@@ -6,14 +6,10 @@ image: $CI_DOCKER_REGISTRY/python:3.9
variables:
DOCKER_TLS_CERTDIR: ""
include:
- template: Security/Secret-Detection.gitlab-ci.yml
# different stages in the pipeline
stages:
- Formatter
- test
- Unittests
- Deploy
formatter:
@@ -39,7 +35,7 @@ pylint:
expire_in: 1 week
tests:
stage: test
stage: Unittests
needs: []
variables:
QT_QPA_PLATFORM: "offscreen"
@@ -72,7 +68,7 @@ semver:
- git tag
# build
- pip install python-semantic-release==7.* wheel
- pip install python-semantic-release wheel
- export GL_TOKEN=$CI_UPDATES
- export REPOSITORY_USERNAME=__token__
- export REPOSITORY_PASSWORD=$CI_PYPI_TOKEN
@@ -98,4 +94,4 @@ semver:
# - curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/beamline-experiment-control/221870/
# rules:
# - if: '$CI_COMMIT_REF_NAME == "master"'
# - if: '$CI_COMMIT_REF_NAME == "production"'
# - if: '$CI_COMMIT_REF_NAME == "production"'

581
.pylintrc
View File

@@ -1,581 +0,0 @@
[MASTER]
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-allow-list=PyQt5, pyqtgraph
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code. (This is an alternative name to extension-pkg-allow-list
# for backward compatibility.)
extension-pkg-whitelist=
# Return non-zero exit code if any of these messages/categories are detected,
# even if score is above --fail-under value. Syntax same as enable. Messages
# specified are enabled, while categories only check already-enabled messages.
fail-on=
# Specify a score threshold to be exceeded before program exits with error.
fail-under=8.0
# Files or directories to be skipped. They should be base names, not paths.
ignore=CVS
# Add files or directories matching the regex patterns to the ignore-list. The
# regex matches against paths and can be in Posix or Windows format.
ignore-paths=
# Files or directories matching the regex patterns are skipped. The regex
# matches against base names, not paths.
ignore-patterns=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use.
jobs=1
# Control the amount of potential inferred values when inferring a single
# object. This can help the performance when dealing with large functions or
# complex, nested conditions.
limit-inference-results=100
# List of plugins (as comma separated values of python module names) to load,
# usually to register additional checkers.
load-plugins=
# Pickle collected data for later comparisons.
persistent=yes
# Minimum Python version to use for version dependent checks. Will default to
# the version used to run pylint.
py-version=3.9
# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
suggestion-mode=yes
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED.
confidence=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once). You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use "--disable=all --enable=classes
# --disable=W".
disable=missing-module-docstring,
missing-class-docstring,
import-error,
no-name-in-module,
raw-checker-failed,
bad-inline-option,
locally-disabled,
file-ignored,
suppressed-message,
deprecated-pragma,
use-symbolic-message-instead,
unused-wildcard-import,
logging-fstring-interpolation,
line-too-long,
too-many-instance-attributes,
wrong-import-order
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=c-extension-no-member
[REPORTS]
# Python expression which should return a score less than or equal to 10. You
# have access to the variables 'error', 'warning', 'refactor', and 'convention'
# which contain the number of messages in each category, as well as 'statement'
# which is the total number of statements analyzed. This score is used by the
# global evaluation report (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details.
#msg-template=
# Set the output format. Available formats are text, parseable, colorized, json
# and msvs (visual studio). You can also give a reporter class, e.g.
# mypackage.mymodule.MyReporterClass.
output-format=text
# Tells whether to display a full report or only the messages.
reports=no
# Activate the evaluation score.
score=yes
[REFACTORING]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
# Complete name of functions that never returns. When checking for
# inconsistent-return-statements if a never returning function is called then
# it will be considered as an explicit return statement and no message will be
# printed.
never-returning-functions=sys.exit,argparse.parse_error
[LOGGING]
# The type of string formatting that logging methods do. `old` means using %
# formatting, `new` is for `{}` formatting.
logging-format-style=old
# Logging modules to check that the string format arguments are in logging
# function parameter format.
logging-modules=logging
[SPELLING]
# Limits count of emitted suggestions for spelling mistakes.
max-spelling-suggestions=4
# Spelling dictionary name. Available dictionaries: none. To make it work,
# install the 'python-enchant' package.
spelling-dict=
# List of comma separated words that should be considered directives if they
# appear and the beginning of a comment and should not be checked.
spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains the private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to the private dictionary (see the
# --spelling-private-dict-file option) instead of raising a message.
spelling-store-unknown-words=no
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,
XXX,
TODO
# Regular expression of note tags to take in consideration.
#notes-rgx=
[TYPECHECK]
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
# Tells whether missing members accessed in mixin class should be ignored. A
# class is considered mixin if its name matches the mixin-class-rgx option.
ignore-mixin-members=yes
# Tells whether to warn about missing members when the owner of the attribute
# is inferred to be None.
ignore-none=yes
# This flag controls whether pylint should warn about no-member and similar
# checks whenever an opaque object is returned when inferring. The inference
# can return multiple potential results while evaluating a Python object, but
# some branches might not be evaluated, which results in partial inference. In
# that case, it might be useful to still emit no-member and other checks for
# the rest of the inferred objects.
ignore-on-opaque-inference=yes
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# Show a hint with possible names when a member name was not found. The aspect
# of finding the hint is based on edit distance.
missing-member-hint=yes
# The minimum edit distance a name should have in order to be considered a
# similar match for a missing member name.
missing-member-hint-distance=1
# The total number of similar names that should be taken in consideration when
# showing a hint for a missing member.
missing-member-max-choices=1
# Regex pattern to define which classes are considered mixins ignore-mixin-
# members is set to 'yes'
mixin-class-rgx=.*[Mm]ixin
# List of decorators that change the signature of a decorated function.
signature-mutators=
[VARIABLES]
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid defining new builtins when possible.
additional-builtins=
# Tells whether unused global variables should be treated as a violation.
allow-global-unused-variables=yes
# List of names allowed to shadow builtins
allowed-redefined-builtins=
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,
_cb
# A regular expression matching the name of dummy variables (i.e. expected to
# not be used).
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
# Argument names that match this expression will be ignored. Default to name
# with leading underscore.
ignored-argument-names=_.*|^ignored_|^unused_
# Tells whether we should check for unused import in __init__ files.
init-import=no
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[FORMAT]
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
# Maximum number of characters on a single line.
max-line-length=100
# Maximum number of lines in a module.
max-module-lines=1000
# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
single-line-class-stmt=no
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=no
[SIMILARITIES]
# Comments are removed from the similarity computation
ignore-comments=yes
# Docstrings are removed from the similarity computation
ignore-docstrings=yes
# Imports are removed from the similarity computation
ignore-imports=no
# Signatures are removed from the similarity computation
ignore-signatures=no
# Minimum lines number of a similarity.
min-similarity-lines=4
[BASIC]
# Naming style matching correct argument names.
argument-naming-style=snake_case
# Regular expression matching correct argument names. Overrides argument-
# naming-style.
#argument-rgx=
# Naming style matching correct attribute names.
attr-naming-style=snake_case
# Regular expression matching correct attribute names. Overrides attr-naming-
# style.
#attr-rgx=
# Bad variable names which should always be refused, separated by a comma.
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
# Bad variable names regexes, separated by a comma. If names match any regex,
# they will always be refused
bad-names-rgxs=
# Naming style matching correct class attribute names.
class-attribute-naming-style=any
# Regular expression matching correct class attribute names. Overrides class-
# attribute-naming-style.
#class-attribute-rgx=
# Naming style matching correct class constant names.
class-const-naming-style=UPPER_CASE
# Regular expression matching correct class constant names. Overrides class-
# const-naming-style.
#class-const-rgx=
# Naming style matching correct class names.
class-naming-style=PascalCase
# Regular expression matching correct class names. Overrides class-naming-
# style.
#class-rgx=
# Naming style matching correct constant names.
const-naming-style=UPPER_CASE
# Regular expression matching correct constant names. Overrides const-naming-
# style.
#const-rgx=
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=-1
# Naming style matching correct function names.
function-naming-style=snake_case
# Regular expression matching correct function names. Overrides function-
# naming-style.
#function-rgx=
# Good variable names which should always be accepted, separated by a comma.
good-names=i,
ii,
jj,
kk,
dr,
j,
k,
ex,
Run,
cb,
_
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs=.*scanID.*,.*RID.*,.*pointID.*,.*ID.*,.*_2D.*,.*_1D.*
# Include a hint for the correct naming format with invalid-name.
include-naming-hint=no
# Naming style matching correct inline iteration names.
inlinevar-naming-style=any
# Regular expression matching correct inline iteration names. Overrides
# inlinevar-naming-style.
#inlinevar-rgx=
# Naming style matching correct method names.
method-naming-style=snake_case
# Regular expression matching correct method names. Overrides method-naming-
# style.
#method-rgx=
# Naming style matching correct module names.
module-naming-style=snake_case
# Regular expression matching correct module names. Overrides module-naming-
# style.
#module-rgx=
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=^_
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
# These decorators are taken in consideration only for invalid-name.
property-classes=abc.abstractproperty
# Naming style matching correct variable names.
variable-naming-style=snake_case
# Regular expression matching correct variable names. Overrides variable-
# naming-style.
#variable-rgx=
[STRING]
# This flag controls whether inconsistent-quotes generates a warning when the
# character used as a quote delimiter is used inconsistently within a module.
check-quote-consistency=no
# This flag controls whether the implicit-str-concat should generate a warning
# on implicit string concatenation in sequences defined over several lines.
check-str-concat-over-line-jumps=no
[IMPORTS]
# List of modules that can be imported at any level, not just the top level
# one.
allow-any-import-level=
# Allow wildcard imports from modules that define __all__.
allow-wildcard-with-all=no
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
# Deprecated modules which should not be used, separated by a comma.
deprecated-modules=
# Output a graph (.gv or any supported image format) of external dependencies
# to the given file (report RP0402 must not be disabled).
ext-import-graph=
# Output a graph (.gv or any supported image format) of all (i.e. internal and
# external) dependencies to the given file (report RP0402 must not be
# disabled).
import-graph=
# Output a graph (.gv or any supported image format) of internal dependencies
# to the given file (report RP0402 must not be disabled).
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Couples of modules and preferred modules, separated by a comma.
preferred-modules=
[CLASSES]
# Warn about protected attribute access inside special methods
check-protected-access-in-special-methods=no
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=cls
[DESIGN]
# List of regular expressions of class ancestor names to ignore when counting
# public methods (see R0903)
exclude-too-few-public-methods=
# List of qualified class names to ignore when counting class parents (see
# R0901)
ignored-parents=
# Maximum number of arguments for function / method.
max-args=5
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
# Maximum number of branch for function / method body.
max-branches=12
# Maximum number of locals for function / method body.
max-locals=15
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
# Maximum number of return / yield for function / method body.
max-returns=6
# Maximum number of statements in function / method body.
max-statements=50
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "BaseException, Exception".
overgeneral-exceptions=BaseException,
Exception

View File

@@ -2,79 +2,6 @@
<!--next-version-placeholder-->
## v0.6.0 (2023-08-11)
### Feature
* New GUI for line_plot.py ([`b57b3bb`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/b57b3bb1afc7c85acc7ed328ac8a219f392869f1))
* Cursor universal signals ([`20e9516`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/20e951659558b7fc023e357bfe07d812c5fd020a))
## v0.5.0 (2023-08-11)
### Feature
* Add generic connect function for slots ([`6a3df34`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/6a3df34cdfbec2434153362ded630305e5dc5e28))
* Add possibility to provide service config ([`8c9a9c9`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/8c9a9c93535ee77c0622b483a3157af367ebce1f))
### Fix
* Dispatcher argparse and scan_plot tests ([`67f619e`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/67f619ee897e0040c6310e67d69fbb2e0685293d))
* Gui event removing bugs ([`a9dd191`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/a9dd191629295ca476e2f9a1b9944ff355216583))
## v0.4.0 (2023-08-11)
### Feature
* Cursor universal for 1D and 2D ([`f75554b`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/f75554bd7b072207847956a8720b9a62c20ba2c8))
* Added qt_utils package with general Crosshair function ([`5353fed`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/5353fed7bfe1819819fa3348ec93d2d0ba540628))
* 2D plot updating ([`d32088b`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/d32088b643a4d0613c32fb464a0a55a3b6b684d6))
* Metadata available on_dap_update ([`18b5d46`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/18b5d46678619a972815532629ce96c121f5fcc9))
* Plotting from streamer ([`bb806c1`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/bb806c149dee88023ecb647b523cbd5189ea9001))
* Added Legend to plot ([`0feca4b`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/0feca4b1578820ec1f5f3ead3073e4d45c23798b))
* Cursor coordinate as a QTable ([`a999f76`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/a999f7669a12910ad66e10a6d2e75197b2dce1c2))
* Changed from PlotItem to GraphicsLayoutWidget, added LabelItem ([`075cc79`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/075cc79d6fa011803cf4a06fbff8faa951c1b59f))
* Add display_ui_file.py ([`91d8ffa`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/91d8ffacffcbeebdf7623caf62e07244c4dcee16))
* Add disconnect_dap_slot ([`1325704`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/1325704750ebab897e3dcae80c9d455bfbbf886f))
* Inherit from GraphicsView for consistency with 2D plot ([`d8c101c`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/d8c101cdd7f960a152a1f318911cac6eecf6bad4))
* Add BECScanPlot2D ([`67905e8`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/67905e896c81383f57c268db544b3314104bda38))
* Emit the full bec message to slots ([`1bb3020`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/1bb30207038f3a54c0e96dbbbcd1ea7f6c70eca2))
### Fix
* Q selection for gui_event signal ([`0bf452a`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/0bf452ad1b7d9ad941e2ef4b8d61ec4ed5266415))
* Fixed logic in data subscription ([`c2d469b`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/c2d469b4543fcf237b274399b83969cc2213b61b))
* Scan_plot to accept metadata from dap signal ([`7bec0b5`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/7bec0b5e6c1663670f8fcc2fc6aa6c8b6df28b61))
* Plotting latest 1d curves ([`378be81`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/378be81bf6dd5e9239f8f1fb908cafc97161c79d))
* Testing the data structure of plotting ([`4fb0a3b`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/4fb0a3b058957f5b37227ff7c8e9bdf5259a1cde))
* Fix examples when run directly as a script ([`cd11ee5`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/cd11ee51c1c725255e748a32b89a74487e84a631))
* Module paths ([`e7f644c`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/e7f644c5079a8665d7d872eb0b27ed7da6cbd078))
## v0.3.0 (2023-07-19)
### Feature
* Add auto-computed color_list from colormaps ([`3e1708b`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/3e1708bf48bc15a25c0d01242fff28d6db868e02))
* Add functionality for plotting multiple signals ([`10e2906`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/10e29064455f50bc3b66c55b4361575957db1489))
* Added lineplot widget ([`989a3f0`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/989a3f080839b98f1e1c2118600cddf449120124))
* Added ctrl_c from grum ([`8fee13a`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/8fee13a67bef3ed6ed6de9d47438f04687f548d8))
### Fix
* Add warning for non-existing signalz ([`48075e4`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/48075e4fe3187f6ac8d0b61f94f8df73b8fd6daf))
* Documentation and bugfix for mouse_moved ([`a460f3c`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/a460f3c0bd7b9e106a758bc330f361868407b1e3))
### Documentation
* Add notes about qt designer install via conda-forge ([`d8038a8`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/d8038a8cd0efa3a16df403390164603e4e8afdd8))
* Added license ([`db2d33e`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/db2d33e8912dc493cce9ee7f09d8336155110079))
## v0.2.1 (2023-07-13)
### Fix
* Fixed setup config (wrong name) ([`947db1e`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/947db1e0f32b067e67f94a7c8321da5194b1547b))
* Fixed bec_lib dependency ([`86f4def`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/86f4deffd899111e8997010487ec54c6c62c43ab))
## v0.2.0 (2023-07-13)
### Feature

29
LICENSE
View File

@@ -1,29 +0,0 @@
BSD 3-Clause License
Copyright (c) 2023, bec
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -1,457 +0,0 @@
import os
import threading
import time
import warnings
from typing import Any
import numpy as np
import pyqtgraph
import pyqtgraph as pg
from bec_lib.core import BECMessage
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QCheckBox, QTableWidgetItem
from pyqtgraph import mkBrush, mkColor, mkPen
from pyqtgraph.Qt import QtCore, QtWidgets, uic
from pyqtgraph.Qt.QtCore import pyqtSignal
from bec_widgets.bec_dispatcher import bec_dispatcher
from bec_lib.core.redis_connector import MessageObject, RedisConnector
client = bec_dispatcher.client
class BasicPlot(QtWidgets.QWidget):
update_signal = pyqtSignal()
roi_signal = pyqtSignal(tuple)
def __init__(self, name="", y_value_list=["gauss_bpm"]) -> None:
"""
Basic plot widget for displaying scan data.
Args:
name (str, optional): Name of the plot. Defaults to "".
y_value_list (list, optional): List of signals to be plotted. Defaults to ["gauss_bpm"].
"""
super(BasicPlot, self).__init__()
# Set style for pyqtgraph plots
pg.setConfigOption("background", "w")
pg.setConfigOption("foreground", "k")
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "basic_plot.ui"), self)
# Set splitter distribution of widgets
self.splitter.setSizes([3, 1])
self._idle_time = 100
self.title = ""
self.label_bottom = ""
self.label_left = ""
self.producer = RedisConnector(["localhost:6379"]).producer()
self.scan_motors = []
self.y_value_list = y_value_list
self.previous_y_value_list = None
self.plotter_data_x = []
self.plotter_data_y = []
self.curves = []
self.pens = []
self.brushs = []
self.plotter_scan_id = None
# TODO to be moved to utils function
plotstyles = {
"symbol": "o",
"symbolSize": 10,
}
color_list = BasicPlot.golden_angle_color(colormap="CET-R2", num=len(self.y_value_list))
# setup plots - GraphicsLayoutWidget
# LabelItem
self.label = pg.LabelItem(justify="center")
self.glw.addItem(self.label)
self.label.setText("ROI region")
# PlotItem - main window
self.glw.nextRow()
self.plot = pg.PlotItem()
self.plot.setLogMode(True, True)
self.glw.addItem(self.plot)
self.plot.addLegend()
# ImageItem - 2D view #TODO add 2D plot for ROI and 1D plot for mouse click
self.glw.nextRow()
self.plot_roi = pg.PlotItem()
self.img = pg.ImageItem()
self.glw.addItem(self.plot_roi)
self.plot_roi.addItem(self.img)
# ROI selector - so far from [-1,1] #TODO update to scale with xrange
self.roi_selector = pg.LinearRegionItem([-1, 1])
for ii, y_value in enumerate(self.y_value_list):
pen = mkPen(color=color_list[ii], width=2, style=QtCore.Qt.DashLine)
brush = mkBrush(color=color_list[ii])
curve = pg.PlotDataItem(
**plotstyles, symbolBrush=brush, pen=pen, skipFiniteCheck=True, name=y_value
)
self.plot.addItem(curve)
self.curves.append(curve)
self.pens.append(pen)
self.brushs.append(brush)
self.add_crosshair(self.plot)
self.add_crosshair(self.plot_roi)
self.crosshair_v = pg.InfiniteLine(angle=90, movable=False)
self.crosshair_h = pg.InfiniteLine(angle=0, movable=False)
#
# for plot in (self.plot_roi, self.plot):
# plot.addItem(self.crosshair_v, ignoreBounds=True)
# plot.addItem(self.crosshair_h, ignoreBounds=True)
# self.plot.addItem(self.crosshair_v, ignoreBounds=True)
# self.plot.addItem(self.crosshair_h, ignoreBounds=True)
# self.plot_roi.addItem(self.crosshair_v, ignoreBounds=True)
# self.plot_roi.addItem(self.crosshair_h, ignoreBounds=True)
# Add textItems
self.add_text_items()
# Manage signals
self.proxy = pg.SignalProxy(
self.plot.scene().sigMouseMoved, rateLimit=60, slot=self.mouse_moved
)
self.proxy_update = pg.SignalProxy(self.update_signal, rateLimit=25, slot=self.update)
self.roi_selector.sigRegionChangeFinished.connect(self.get_roi_region)
# Debug functions
self.pushButton_debug.clicked.connect(self.generate_2D_data_update)
# self.generate_2D_data()
self._current_proj = None
self._current_metadata_ep = "px_stream/projection_{}/metadata"
self.data_retriever = threading.Thread(target=self.on_projection, daemon=True)
self.data_retriever.start()
def debug(self):
"""
Debug button just for quick testing
"""
def generate_2D_data(self):
data = np.random.normal(size=(1, 100))
self.img.setImage(data)
def generate_2D_data_update(self):
data = np.random.normal(size=(200, 300))
self.img.setImage(data, levels=(0.2, 0.5))
def add_crosshair(self, plot):
crosshair_v = pg.InfiniteLine(angle=90, movable=False)
crosshair_h = pg.InfiniteLine(angle=0, movable=False)
plot.addItem(crosshair_v)
plot.addItem(crosshair_h)
def get_roi_region(self):
"""For testing purpose now, get roi region and print it to self.label as tuple"""
region = self.roi_selector.getRegion()
self.label.setText(f"x = {(10**region[0]):.4f}, y ={(10**region[1]):.4f}")
return_dict = {
"horiz_roi": [
np.where(self.plotter_data_x[0] > 10 ** region[0])[0][0],
np.where(self.plotter_data_x[0] < 10 ** region[1])[0][-1],
]
}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
self.producer.set_and_publish("px_stream/gui_event", msg=msg)
self.roi_signal.emit(region)
def add_text_items(self): # TODO probably can be removed
"""Add text items to the plot"""
# self.mouse_box_data.setText("Mouse cursor")
# TODO Via StyleSheet, one may set the color of the full QLabel
# self.mouse_box_data.setStyleSheet(f"QLabel {{color : rgba{self.pens[0].color().getRgb()}}}")
def mouse_moved(self, event: tuple) -> None:
"""
Update the mouse table with the current mouse position and the corresponding data.
Args:
event (tuple): Mouse event containing the position of the mouse cursor.
The position is stored in first entry as horizontal, vertical pixel.
"""
pos = event[0]
if not self.plot.sceneBoundingRect().contains(pos):
return
mousePoint = self.plot.vb.mapSceneToView(pos)
self.crosshair_v.setPos(mousePoint.x())
self.crosshair_h.setPos(mousePoint.y())
if not self.plotter_data_x:
return
closest_point = self.closest_x_y_value(
mousePoint.x(), self.plotter_data_x[0], self.plotter_data_y[0]
)
# self.precision = 3
# ii = 0
# y_value = self.y_value_list[ii]
# x_data = f"{10**closest_point[0]:.{self.precision}f}"
# y_data = f"{10**closest_point[1]:.{self.precision}f}"
#
# # Write coordinate to QTable
# self.mouse_table.setItem(ii, 1, QTableWidgetItem(str(y_value)))
# self.mouse_table.setItem(ii, 2, QTableWidgetItem(str(x_data)))
# self.mouse_table.setItem(ii, 3, QTableWidgetItem(str(y_data)))
#
# self.mouse_table.resizeColumnsToContents()
def closest_x_y_value(self, input_value, list_x, list_y) -> tuple:
"""
Find the closest x and y value to the input value.
Args:
input_value (float): Input value
list_x (list): List of x values
list_y (list): List of y values
Returns:
tuple: Closest x and y value
"""
arr = np.asarray(list_x)
i = (np.abs(arr - input_value)).argmin()
return list_x[i], list_y[i]
def update(self):
"""Update the plot with the new data."""
# check if roi selector is in the plot
if self.roi_selector not in self.plot.items:
self.plot.addItem(self.roi_selector)
# check if QTable was initialised and if list of devices was changed
if self.y_value_list != self.previous_y_value_list:
self.setup_cursor_table()
self.previous_y_value_list = self.y_value_list.copy() if self.y_value_list else None
self.curves[0].setData(self.plotter_data_x[0], self.plotter_data_y[0])
# if len(self.plotter_data_x[0]) <= 1:
# return
# self.plot.setLabel("bottom", self.label_bottom)
# self.plot.setLabel("left", self.label_left)
# for ii in range(len(self.y_value_list)):
# self.curves[0].setData(self.plotter_data_x[0], self.plotter_data_y[0])
@pyqtSlot(dict, dict)
def on_scan_segment(self, data: dict, metadata: dict) -> None:
"""Update function that is called during the scan callback. To avoid
too many renderings, the GUI is only processing events every <_idle_time> ms.
Args:
data (dict): Dictionary containing a new scan segment
metadata (dict): Scan metadata
"""
if metadata["scanID"] != self.plotter_scan_id:
self.plotter_scan_id = metadata["scanID"]
self._reset_plot_data()
self.title = f"Scan {metadata['scan_number']}"
self.scan_motors = scan_motors = metadata.get("scan_report_devices")
# client = BECClient()
remove_y_value_index = [
index
for index, y_value in enumerate(self.y_value_list)
if y_value not in client.device_manager.devices
]
if remove_y_value_index:
for ii in sorted(remove_y_value_index, reverse=True):
# TODO Use bec warning message??? to be discussed with Klaus
warnings.warn(
f"Warning: no matching signal for {self.y_value_list[ii]} found in list of devices. Removing from plot."
)
self.remove_curve_by_name(self.plot, self.y_value_list[ii])
self.y_value_list.pop(ii)
self.precision = client.device_manager.devices[scan_motors[0]]._info["describe"][
scan_motors[0]
]["precision"]
# TODO after update of bec_lib, this will be new way to access data
# self.precision = client.device_manager.devices[scan_motors[0]].precision
x = data["data"][scan_motors[0]][scan_motors[0]]["value"]
self.plotter_data_x.append(x)
for ii, y_value in enumerate(self.y_value_list):
y = data["data"][y_value][y_value]["value"]
self.plotter_data_y[ii].append(y)
self.label_bottom = scan_motors[0]
self.label_left = f"{', '.join(self.y_value_list)}"
# print(f'metadata scan N{metadata["scan_number"]}') #TODO put as label on top of plot
# print(f'Data point = {data["point_id"]}') #TODO can be used for progress bar
if len(self.plotter_data_x) <= 1:
return
self.update_signal.emit()
def _reset_plot_data(self):
"""Reset the plot data."""
self.plotter_data_x = []
self.plotter_data_y = []
for ii in range(len(self.y_value_list)):
self.curves[ii].setData([], [])
self.plotter_data_y.append([])
def setup_cursor_table(self):
"""QTable formatting according to N of devices displayed in plot."""
# Init number of rows in table according to n of devices
self.mouse_table.setRowCount(len(self.y_value_list))
for ii, y_value in enumerate(self.y_value_list):
checkbox = QCheckBox()
checkbox.setChecked(True)
# TODO just for testing, will be replaced by removing/adding curve
checkbox.stateChanged.connect(lambda: print("status Changed"))
# checkbox.stateChanged.connect(lambda: self.remove_curve_by_name(plot=self.plot, checkbox=checkbox, name=y_value))
self.mouse_table.setCellWidget(ii, 0, checkbox)
self.mouse_table.setItem(ii, 1, QTableWidgetItem(str(y_value)))
self.mouse_table.resizeColumnsToContents()
@staticmethod
def remove_curve_by_name(plot: pyqtgraph.PlotItem, name: str) -> None:
# def remove_curve_by_name(plot: pyqtgraph.PlotItem, checkbox: QtWidgets.QCheckBox, name: str) -> None:
"""Removes a curve from the given plot by the specified name.
Args:
plot (pyqtgraph.PlotItem): The plot from which to remove the curve.
name (str): The name of the curve to remove.
"""
# if checkbox.isChecked():
for item in plot.items:
if isinstance(item, pg.PlotDataItem) and getattr(item, "opts", {}).get("name") == name:
plot.removeItem(item)
return
# else:
# return
@staticmethod
def golden_ratio(num: int) -> list:
"""Calculate the golden ratio for a given number of angles.
Args:
num (int): Number of angles
"""
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
angles = []
for ii in range(num):
x = np.cos(ii * phi)
y = np.sin(ii * phi)
angle = np.arctan2(y, x)
angles.append(angle)
return angles
@staticmethod
def golden_angle_color(colormap: str, num: int) -> list:
"""
Extract num colors for from the specified colormap following golden angle distribution.
Args:
colormap (str): Name of the colormap
num (int): Number of requested colors
Returns:
list: List of colors with length <num>
Raises:
ValueError: If the number of requested colors is greater than the number of colors in the colormap.
"""
cmap = pg.colormap.get(colormap)
cmap_colors = cmap.color
if num > len(cmap_colors):
raise ValueError(
f"Number of colors requested ({num}) is greater than the number of colors in the colormap ({len(cmap_colors)})"
)
angles = BasicPlot.golden_ratio(len(cmap_colors))
color_selection = np.round(np.interp(angles, (-np.pi, np.pi), (0, len(cmap_colors))))
colors = [
mkColor(tuple((cmap_colors[int(ii)] * 255).astype(int))) for ii in color_selection[:num]
]
return colors
def on_projection(self):
while True:
if self._current_proj is None:
time.sleep(0.1)
continue
endpoint = f"px_stream/projection_{self._current_proj}/data"
msgs = client.producer.lrange(topic=endpoint, start=-1, end=-1)
data = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
if not data:
continue
with np.errstate(divide="ignore", invalid="ignore"):
self.plotter_data_y = [
np.sum(
np.sum(data[-1].content["signals"]["data"] * self._current_norm, axis=1)
/ np.sum(self._current_norm, axis=0),
axis=0,
).squeeze()
]
self.update_signal.emit()
@pyqtSlot(dict, dict)
def on_dap_update(self, data: dict, metadata: dict):
self.img.setImage(data["z"].T)
# time.sleep(0,1)
@pyqtSlot(dict)
def new_proj(self, data):
proj_nr = data["proj_nr"]
endpoint = f"px_stream/projection_{proj_nr}/metadata"
msg_raw = client.producer.get(topic=endpoint)
msg = BECMessage.DeviceMessage.loads(msg_raw)
self._current_q = msg.content["signals"]["q"]
self._current_norm = msg.content["signals"]["norm_sum"]
self._current_metadata = msg.content["signals"]["metadata"]
self.plotter_data_x = [self._current_q]
self._current_proj = proj_nr
if __name__ == "__main__":
import argparse
from bec_widgets import ctrl_c
from bec_widgets.bec_dispatcher import bec_dispatcher
parser = argparse.ArgumentParser()
parser.add_argument(
"--signals",
help="specify recorded signals",
nargs="+",
default=["gauss_bpm"],
)
# default = ["gauss_bpm", "bpm4i", "bpm5i", "bpm6i", "xert"],
value = parser.parse_args()
print(f"Plotting signals for: {', '.join(value.signals)}")
client = bec_dispatcher.client
# client.start()
app = QtWidgets.QApplication([])
ctrl_c.setup(app)
plot = BasicPlot(y_value_list=value.signals)
# bec_dispatcher.connect(plot)
bec_dispatcher.connect_proj_id(plot.new_proj)
bec_dispatcher.connect_dap_slot(plot.on_dap_update, "px_dap_worker")
plot.show()
# client.callbacks.register("scan_segment", plot, sync=False)
app.exec_()

View File

@@ -1,77 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>845</width>
<height>635</height>
</rect>
</property>
<property name="windowTitle">
<string>Line Plot</string>
</property>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<widget class="QSplitter" name="splitter">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="opaqueResize">
<bool>false</bool>
</property>
<widget class="QWidget" name="verticalLayoutWidget">
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QPushButton" name="pushButton_debug">
<property name="text">
<string>Debug</string>
</property>
</widget>
</item>
<item>
<widget class="GraphicsLayoutWidget" name="glw"/>
</item>
</layout>
</widget>
<widget class="QTableWidget" name="mouse_table">
<property name="textElideMode">
<enum>Qt::ElideMiddle</enum>
</property>
<column>
<property name="text">
<string>Display</string>
</property>
</column>
<column>
<property name="text">
<string>Device</string>
</property>
</column>
<column>
<property name="text">
<string>X</string>
</property>
</column>
<column>
<property name="text">
<string>Y</string>
</property>
</column>
</widget>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>GraphicsLayoutWidget</class>
<extends>QGraphicsView</extends>
<header>pyqtgraph.h</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,227 +0,0 @@
import argparse
import itertools
import os
from dataclasses import dataclass
from threading import RLock
from bec_lib import BECClient
from bec_lib.core import BECMessage, MessageEndpoints, ServiceConfig
from bec_lib.core.redis_connector import RedisConsumerThreaded
from PyQt5.QtCore import QObject, pyqtSignal
@dataclass
class _BECDap:
"""Utility class to keep track of slots associated with a particular dap redis consumer"""
consumer: RedisConsumerThreaded
slots = set()
# Adding a new pyqt signal requres a class factory, as they must be part of the class definition
# and cannot be dynamically added as class attributes after the class has been defined.
_signal_class_factory = (
type(f"Signal{i}", (QObject,), dict(signal=pyqtSignal("PyQt_PyObject")))
for i in itertools.count()
)
@dataclass
class _Connection:
"""Utility class to keep track of slots connected to a particular redis consumer"""
consumer: RedisConsumerThreaded
slots = set()
# keep a reference to a new signal class, so it is not gc'ed
_signal_container = next(_signal_class_factory)()
def __post_init__(self):
self.signal = self._signal_container.signal
class _BECDispatcher(QObject):
new_scan = pyqtSignal(dict, dict)
scan_segment = pyqtSignal(dict, dict)
new_dap_data = pyqtSignal(dict, dict)
new_projection_id = pyqtSignal(dict)
new_projection_data = pyqtSignal(dict)
def __init__(self, bec_config=None):
super().__init__()
self.client = BECClient()
# TODO: this is a workaround for now to provide service config within qtdesigner, but is
# it possible to provide config via a cli arg?
if bec_config is None and os.path.isfile("bec_config.yaml"):
bec_config = "bec_config.yaml"
self.client.initialize(config=ServiceConfig(config_path=bec_config))
self._slot_signal_map = {
"on_scan_segment": self.scan_segment,
"on_new_scan": self.new_scan,
}
self._daps = {}
self._connections = {}
self._scan_id = None
scan_lock = RLock()
# self.new_projection_id.connect(self.new_projection_data)
def _scan_segment_cb(msg):
msg = BECMessage.ScanMessage.loads(msg.value)[0]
with scan_lock:
# TODO: use ScanStatusMessage instead?
scan_id = msg.content["scanID"]
if self._scan_id != scan_id:
self._scan_id = scan_id
self.new_scan.emit(msg.content, msg.metadata)
self.scan_segment.emit(msg.content, msg.metadata)
scan_segment_topic = MessageEndpoints.scan_segment()
self._scan_segment_thread = self.client.connector.consumer(
topics=scan_segment_topic,
cb=_scan_segment_cb,
)
self._scan_segment_thread.start()
def connect(self, widget):
for slot_name, signal in self._slot_signal_map.items():
slot = getattr(widget, slot_name, None)
if callable(slot):
signal.connect(slot)
def connect_slot(self, slot, topic):
# create new connection for topic if it doesn't exist
if topic not in self._connections:
def cb(msg):
msg = BECMessage.MessageReader.loads(msg.value)
self._connections[topic].signal.emit(msg)
consumer = self.client.connector.consumer(topics=topic, cb=cb)
consumer.start()
self._connections[topic] = _Connection(consumer)
# connect slot if it's not connected
if slot not in self._connections[topic].slots:
self._connections[topic].signal.connect(slot)
self._connections[topic].slots.add(slot)
def disconnect_slot(self, slot, topic):
if topic not in self._connections:
return
if slot not in self._connections[topic].slots:
return
self._connections[topic].signal.disconnect(slot)
self._connections[topic].slots.remove(slot)
if not self._connections[topic].slots:
# shutdown consumer if there are no more connected slots
self._connections[topic].consumer.shutdown()
del self._connections[topic]
def connect_dap_slot(self, slot, dap_name):
if dap_name not in self._daps:
# create a new consumer and connect slot
def _dap_cb(msg):
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
self.new_dap_data.emit(msg.content["data"], msg.metadata)
dap_ep = MessageEndpoints.processed_data(dap_name)
consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)
consumer.start()
self.new_dap_data.connect(slot)
self._daps[dap_name] = _BECDap(consumer)
self._daps[dap_name].slots.add(slot)
else:
# connect slot if it's not yet connected
if slot not in self._daps[dap_name].slots:
self.new_dap_data.connect(slot)
self._daps[dap_name].slots.add(slot)
def disconnect_dap_slot(self, slot, dap_name):
if dap_name not in self._daps:
return
if slot not in self._daps[dap_name].slots:
return
self.new_dap_data.disconnect(slot)
self._daps[dap_name].slots.remove(slot)
if not self._daps[dap_name].slots:
# shutdown consumer if there are no more connected slots
self._daps[dap_name].consumer.shutdown()
del self._daps[dap_name]
# def connect_proj_data(self, slot):
# keys = self.client.producer.keys("px_stream/projection_*")
# keys = keys or []
#
# def _dap_cb(msg):
# msg = BECMessage.DeviceMessage.loads(msg.value)
# self.new_projection_data.emit(msg.content["data"])
#
# proj_numbers = set(key.decode().split("px_stream/projection_")[1].split("/")[0] for key in keys)
# last_proj_id = sorted(proj_numbers)[-1]
# dap_ep = MessageEndpoints.processed_data(f"px_stream/projection_{last_proj_id}/")
#
# consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)
# consumer.start()
#
# self.new_projection_data.connect(slot)
def connect_proj_id(self, slot):
def _dap_cb(msg):
msg = BECMessage.DeviceMessage.loads(msg.value)
self.new_projection_id.emit(msg.content["signals"])
dap_ep = "px_stream/proj_nr"
consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)
consumer.start()
self.new_projection_id.connect(slot)
def connect_proj_data(self, slot: object, data_ep: str) -> object:
def _dap_cb(msg):
msg = BECMessage.DeviceMessage.loads(msg.value)
self.new_projection_data.emit(msg.content["signals"])
consumer = self.client.connector.consumer(topics=data_ep, cb=_dap_cb)
consumer.start()
self._daps[data_ep] = _BECDap(consumer)
self._daps[data_ep].slots.add(slot)
self.new_projection_data.connect(slot)
def disconnect_proj_data(self, slot, data_ep):
if data_ep not in self._daps:
return
if slot not in self._daps[data_ep].slots:
return
self.new_projection_data.disconnect(slot)
self._daps[data_ep].slots.remove(slot)
if not self._daps[data_ep].slots:
# shutdown consumer if there are no more connected slots
self._daps[data_ep].consumer.shutdown()
del self._daps[data_ep]
parser = argparse.ArgumentParser()
parser.add_argument("--bec-config", default=None)
args, _ = parser.parse_known_args()
bec_dispatcher = _BECDispatcher(args.bec_config)

98
bec_widgets/cli.py Normal file
View File

@@ -0,0 +1,98 @@
import argparse
import os
from threading import RLock
from PyQt5 import uic
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow
from scan_plot import BECScanPlot
from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector
class BEC_UI(QMainWindow):
new_scan_data = pyqtSignal(dict)
new_dap_data = pyqtSignal(dict) # signal per proc instance?
new_scan = pyqtSignal()
def __init__(self, uipath):
super().__init__()
self._scan_channels = set()
self._dap_channels = set()
self._scan_thread = None
self._dap_threads = []
ui = uic.loadUi(uipath, self)
_, fname = os.path.split(uipath)
self.setWindowTitle(fname)
for sp in ui.findChildren(BECScanPlot):
for chan in (sp.x_channel, *sp.y_channel_list):
if chan.startswith("dap."):
chan = chan.partition("dap.")[-1]
self._dap_channels.add(chan)
else:
self._scan_channels.add(chan)
sp.initialize() # TODO: move this elsewhere?
self.new_scan_data.connect(sp.redraw_scan) # TODO: merge
self.new_dap_data.connect(sp.redraw_dap)
self.new_scan.connect(sp.clearData)
# Scan setup
self._scan_id = None
scan_lock = RLock()
def _scan_cb(msg):
msg = BECMessage.ScanMessage.loads(msg.value)
with scan_lock:
scan_id = msg[0].content["scanID"]
if self._scan_id != scan_id:
self._scan_id = scan_id
self.new_scan.emit()
self.new_scan_data.emit(msg[0].content["data"])
bec_connector = RedisConnector("localhost:6379")
if self._scan_channels:
scan_readback = MessageEndpoints.scan_segment()
self._scan_thread = bec_connector.consumer(
topics=scan_readback,
cb=_scan_cb,
)
self._scan_thread.start()
# DAP setup
def _proc_cb(msg):
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
self.new_dap_data.emit(msg.content["data"])
if self._dap_channels:
for chan in self._dap_channels:
proc_ep = MessageEndpoints.processed_data(chan)
dap_thread = bec_connector.consumer(topics=proc_ep, cb=_proc_cb)
dap_thread.start()
self._dap_threads.append(dap_thread)
self.show()
def main():
parser = argparse.ArgumentParser(
prog="bec-pyqt", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("uipath", type=str, help="Path to a BEC ui file")
args, rem = parser.parse_known_args()
app = QApplication(rem)
BEC_UI(args.uipath)
app.exec_()
if __name__ == "__main__":
main()

View File

@@ -2,11 +2,12 @@ from typing import List
import numpy as np
import pyqtgraph as pg
from PyQt5.QtWidgets import QApplication, QGridLayout, QSizePolicy, QWidget
from pyqtgraph import mkPen
from pyqtgraph.Qt import QtCore, QtWidgets
from pyqtgraph.Qt import QtCore
class ConfigPlotter(pg.GraphicsWidget):
class ConfigPlotter(QWidget):
"""
ConfigPlotter is a widget that can be used to plot data from multiple channels
in a grid layout. The layout is specified by a list of dicts, where each dict
@@ -28,7 +29,7 @@ class ConfigPlotter(pg.GraphicsWidget):
"""
def __init__(self, configs: List[dict], parent=None):
super().__init__(parent)
super(ConfigPlotter, self).__init__()
self.configs = configs
self.plots = {}
self._init_ui()
@@ -37,65 +38,73 @@ class ConfigPlotter(pg.GraphicsWidget):
def _init_ui(self):
pg.setConfigOption("background", "w")
pg.setConfigOption("foreground", "k")
# pylint: disable=no-member
self.layout = QGridLayout()
self.setLayout(self.layout)
self.pen = mkPen(color=(56, 76, 107), width=4, style=QtCore.Qt.SolidLine)
self.view = pg.GraphicsView()
self.view.setAntialiasing(True)
self.view.show()
self.layout = pg.GraphicsLayout()
self.view.setCentralWidget(self.layout)
self.show()
def _init_plots(self):
for config in self.configs:
channels = config["config"]["channels"]
for channel in channels:
item = pg.PlotItem()
self.layout.addItem(
item,
row=config["y"],
col=config["x"],
rowspan=config["rows"],
colspan=config["cols"],
)
# call the corresponding init function, e.g. init_plotitem
init_func = getattr(self, f"init_{config['config']['item']}")
init_func(channel, config["config"], item)
init_func(channel, config)
# self.init_ImageItem(channel, config["config"], item)
def init_PlotItem(self, channel: str, config: dict, item: pg.GraphicsItem):
def init_PlotItem(self, channel: str, config: dict):
"""
Initialize a PlotItem
Args:
channel(str): channel to plot
config(dict): config dict for the channel
item(pg.GraphicsItem): PlotItem to plot the data
"""
# pylint: disable=invalid-name
plot_data = item.plot(np.random.rand(100), pen=self.pen)
item.setLabel("left", channel)
self.plots[channel] = {"item": item, "plot_data": plot_data}
plot_widget = pg.PlotWidget()
plot_widget.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Fixed)
self.layout.addWidget(plot_widget, config["y"], config["x"], config["rows"], config["cols"])
plot_data = plot_widget.plot(np.random.rand(100), pen=self.pen)
# item.setLabel("left", channel)
# self.plots[channel] = {"item": item, "plot_data": plot_data}
def init_ImageItem(self, channel: str, config: dict, item: pg.GraphicsItem):
def init_ImageItem(self, channel: str, config: dict):
"""
Initialize an ImageItem
Args:
channel(str): channel to plot
config(dict): config dict for the channel
item(pg.GraphicsItem): ImageItem to plot the data
"""
# pylint: disable=invalid-name
item = pg.PlotItem()
self.layout.addItem(
item,
row=config["y"],
col=config["x"],
rowspan=config["rows"],
colspan=config["cols"],
)
img = pg.ImageItem()
item.addItem(img)
img.setImage(np.random.rand(100, 100))
self.plots[channel] = {"item": item, "plot_data": img}
def init_ImageView(self, channel: str, config: dict):
"""
Initialize an ImageView
Args:
channel(str): channel to plot
config(dict): config dict for the channel
"""
# pylint: disable=invalid-name
img = pg.ImageView()
img.setImage(np.random.rand(100, 100))
self.layout.addWidget(img, config["y"], config["x"], config["rows"], config["cols"])
self.plots[channel] = {"item": img, "plot_data": img}
if __name__ == "__main__":
import sys
@@ -120,10 +129,10 @@ if __name__ == "__main__":
"rows": 2,
"y": 0,
"x": 1,
"config": {"channels": ["c"], "label_xy": ["", "c"], "item": "ImageItem"},
"config": {"channels": ["c"], "label_xy": ["", "c"], "item": "ImageView"},
},
]
app = QtWidgets.QApplication(sys.argv)
app = QApplication(sys.argv)
win = ConfigPlotter(CONFIG)
pg.exec()

View File

@@ -1,38 +0,0 @@
import signal
import socket
from PyQt5.QtNetwork import QAbstractSocket
def setup(app):
app.signalwatchdog = SignalWatchdog() # need to store to keep socket pair alive
signal.signal(signal.SIGINT, make_quit_handler(app))
def make_quit_handler(app):
def handler(*args):
print() # make ^C appear on its own line
app.quit()
return handler
class SignalWatchdog(QAbstractSocket):
def __init__(self):
"""
Propagates system signals from Python to QEventLoop
adapted from https://stackoverflow.com/a/65802260/655404
"""
super().__init__(QAbstractSocket.SctpSocket, None)
self.writer, self.reader = writer, reader = socket.socketpair()
writer.setblocking(False)
fd_writer = writer.fileno()
fd_reader = reader.fileno()
signal.set_wakeup_fd(fd_writer) # Python hook
self.setSocketDescriptor(fd_reader) # Qt hook
self.readyRead.connect(
lambda: None
) # dummy function call that lets the Python interpreter run

View File

@@ -1,33 +0,0 @@
import os
import sys
from PyQt5 import QtWidgets, uic
class UI(QtWidgets.QWidget):
def __init__(self, uipath):
super().__init__()
self.ui = uic.loadUi(uipath, self)
_, fname = os.path.split(uipath)
self.setWindowTitle(fname)
self.show()
def main():
"""A basic script to display UI file
Run the script, passing UI file path as an argument, e.g.
$ python bec_widgets/display_ui_file.py bec_widgets/line_plot.ui
"""
app = QtWidgets.QApplication(sys.argv)
UI(sys.argv[1])
sys.exit(app.exec_())
if __name__ == "__main__":
main()

View File

@@ -1,352 +0,0 @@
import os
import threading
import time
import warnings
from typing import Any
import numpy as np
import pyqtgraph
import pyqtgraph as pg
from bec_lib.core import BECMessage
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QCheckBox, QTableWidgetItem
from pyqtgraph import mkBrush, mkColor, mkPen
from pyqtgraph.Qt import QtCore, QtWidgets, uic
from pyqtgraph.Qt.QtCore import pyqtSignal
from bec_widgets.bec_dispatcher import bec_dispatcher
from bec_lib.core.redis_connector import MessageObject, RedisConnector
from qt_utils import Crosshair
client = bec_dispatcher.client
class BasicPlot(QtWidgets.QWidget):
update_signal = pyqtSignal()
roi_signal = pyqtSignal(tuple)
def __init__(self, name="", y_value_list=["gauss_bpm"]) -> None:
"""
Basic plot widget for displaying scan data.
Args:
name (str, optional): Name of the plot. Defaults to "".
y_value_list (list, optional): List of signals to be plotted. Defaults to ["gauss_bpm"].
"""
super(BasicPlot, self).__init__()
# Set style for pyqtgraph plots
pg.setConfigOption("background", "w")
pg.setConfigOption("foreground", "k")
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "line_plot.ui"), self)
self.splitter_H.setSizes([1, 1])
self._idle_time = 100
self.title = ""
self.label_bottom = ""
self.label_left = ""
self.producer = RedisConnector(["localhost:6379"]).producer()
self.scan_motors = []
self.y_value_list = y_value_list
self.previous_y_value_list = None
self.plotter_data_x = []
self.plotter_data_y = []
self.plotter_scan_id = None
##########################
# Buttons
##########################
self.init_ui()
self.hook_crosshair()
self.pushButton_generate.clicked.connect(self.generate_data)
self._current_proj = None
self._current_metadata_ep = "px_stream/projection_{}/metadata"
self.proxy_update = pg.SignalProxy(self.update_signal, rateLimit=25, slot=self.update)
self.data_retriever = threading.Thread(target=self.on_projection, daemon=True)
self.data_retriever.start()
def init_ui(self):
"""Setup all ui elements"""
##########################
# 1D Plot
##########################
# LabelItem for ROI
self.label_plot = pg.LabelItem(justify="center")
self.glw_plot.addItem(self.label_plot)
self.label_plot.setText("ROI region")
# ROI selector - so far from [-1,1] #TODO update to scale with xrange
self.roi_selector = pg.LinearRegionItem([-1, 1])
# self.glw_plot.nextRow() #TODO update of cursor
# self.label_plot_moved = pg.LabelItem(justify="center")
# self.glw_plot.addItem(self.label_plot_moved)
# self.label_plot_moved.setText("Actual coordinates (X, Y)")
#
# # Label for coordinates clicked
# self.glw_plot.nextRow()
# self.label_plot_clicked = pg.LabelItem(justify="center")
# self.glw_plot.addItem(self.label_plot_clicked)
# self.label_plot_clicked.setText("Clicked coordinates (X, Y)")
# 1D PlotItem
self.glw_plot.nextRow()
self.plot = pg.PlotItem()
self.plot.setLogMode(True, True)
self.glw_plot.addItem(self.plot)
self.plot.addLegend()
# check if roi selector is in the plot
if self.roi_selector not in self.plot.items:
self.plot.addItem(self.roi_selector)
self.curves = []
self.pens = []
self.brushs = []
self.color_list = BasicPlot.golden_angle_color(
colormap="CET-R2", num=len(self.y_value_list)
)
for ii, y_value in enumerate(self.y_value_list):
pen = mkPen(color=self.color_list[ii], width=2, style=QtCore.Qt.DashLine)
brush = mkBrush(color=self.color_list[ii])
curve = pg.PlotDataItem(symbolBrush=brush, pen=pen, skipFiniteCheck=True, name=y_value)
self.plot.addItem(curve)
self.curves.append(curve)
self.pens.append(pen)
self.brushs.append(brush)
##########################
# 2D Plot
##########################
# Label for coordinates moved
self.label_image_moved = pg.LabelItem(justify="center")
self.glw_image.addItem(self.label_image_moved)
self.label_image_moved.setText("Actual coordinates (X, Y)")
# Label for coordinates clicked
self.glw_image.nextRow()
self.label_image_clicked = pg.LabelItem(justify="center")
self.glw_image.addItem(self.label_image_clicked)
self.label_image_clicked.setText("Clicked coordinates (X, Y)")
# 2D ImageItem
self.glw_image.nextRow()
self.plot_image = pg.PlotItem()
self.img = pg.ImageItem()
self.glw_image.addItem(self.plot_image)
self.plot_image.addItem(self.img)
def hook_crosshair(self):
...
# self.crosshair_plot = Crosshair(self.plot, precision=2)
self.crosshair_2D = Crosshair(self.plot_image)
self.crosshair_2D.coordinatesChanged2D.connect(
lambda x, y: self.label_image_moved.setText(f"Moved : ({x}, {y})")
)
self.crosshair_2D.coordinatesClicked2D.connect(
lambda x, y: self.label_image_clicked.setText(f"Moved : ({x}, {y})")
)
# ROI
self.roi_selector.sigRegionChangeFinished.connect(self.get_roi_region)
def generate_data(self):
def gauss(x, mu, sigma):
return (1 / (sigma * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((x - mu) / sigma) ** 2)
mu = 0 # mean
sigma = 1 # standard deviation
self.plotter_data_x = np.linspace(0, 10, 1000)
self.plotter_data_y = [
gauss(self.plotter_data_x, mu, sigma),
np.sin(self.plotter_data_x),
np.cos(self.plotter_data_x),
np.sin(2 * self.plotter_data_x),
] # List of y-values for multiple curves
self.y_value_list = ["gauss"] # ["Sine"]#, "Cosine", "Sine2x"]
# Curves
color_list = ["#384c6b", "#e28a2b", "#5E3023", "#e41a1c", "#984e83", "#4daf4a"]
for ii in range(len(self.y_value_list)):
self.curves[ii].setData(self.plotter_data_x, self.plotter_data_y[ii])
self.data_2D = np.random.random((150, 30))
self.img.setImage(self.data_2D)
if self.roi_selector not in self.plot.items:
self.plot.addItem(self.roi_selector)
def get_roi_region(self):
"""For testing purpose now, get roi region and print it to self.label as tuple"""
region = self.roi_selector.getRegion()
self.label_plot.setText(f"x = {(10 ** region[0]):.4f}, y ={(10 ** region[1]):.4f}")
return_dict = {
"horiz_roi": [
np.where(self.plotter_data_x[0] > 10 ** region[0])[0][0],
np.where(self.plotter_data_x[0] < 10 ** region[1])[0][-1],
]
}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
self.producer.set_and_publish("px_stream/gui_event", msg=msg)
self.roi_signal.emit(region)
def update(self):
"""Update the plot with the new data."""
print("updated")
# check if QTable was initialised and if list of devices was changed
# if self.y_value_list != self.previous_y_value_list:
# self.setup_cursor_table()
# self.previous_y_value_list = self.y_value_list.copy() if self.y_value_list else None
self.curves[0].setData(self.plotter_data_x[0], self.plotter_data_y[0])
@staticmethod
def remove_curve_by_name(plot: pyqtgraph.PlotItem, name: str) -> None:
# def remove_curve_by_name(plot: pyqtgraph.PlotItem, checkbox: QtWidgets.QCheckBox, name: str) -> None:
"""Removes a curve from the given plot by the specified name.
Args:
plot (pyqtgraph.PlotItem): The plot from which to remove the curve.
name (str): The name of the curve to remove.
"""
# if checkbox.isChecked():
for item in plot.items:
if isinstance(item, pg.PlotDataItem) and getattr(item, "opts", {}).get("name") == name:
plot.removeItem(item)
return
# else:
# return
@staticmethod
def golden_ratio(num: int) -> list:
"""Calculate the golden ratio for a given number of angles.
Args:
num (int): Number of angles
"""
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
angles = []
for ii in range(num):
x = np.cos(ii * phi)
y = np.sin(ii * phi)
angle = np.arctan2(y, x)
angles.append(angle)
return angles
@staticmethod
def golden_angle_color(colormap: str, num: int) -> list:
"""
Extract num colors for from the specified colormap following golden angle distribution.
Args:
colormap (str): Name of the colormap
num (int): Number of requested colors
Returns:
list: List of colors with length <num>
Raises:
ValueError: If the number of requested colors is greater than the number of colors in the colormap.
"""
cmap = pg.colormap.get(colormap)
cmap_colors = cmap.color
if num > len(cmap_colors):
raise ValueError(
f"Number of colors requested ({num}) is greater than the number of colors in the colormap ({len(cmap_colors)})"
)
angles = BasicPlot.golden_ratio(len(cmap_colors))
color_selection = np.round(np.interp(angles, (-np.pi, np.pi), (0, len(cmap_colors))))
colors = [
mkColor(tuple((cmap_colors[int(ii)] * 255).astype(int))) for ii in color_selection[:num]
]
return colors
def on_projection(self):
while True:
if self._current_proj is None:
time.sleep(0.1)
continue
endpoint = f"px_stream/projection_{self._current_proj}/data"
msgs = client.producer.lrange(topic=endpoint, start=-1, end=-1)
data = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
if not data:
continue
with np.errstate(divide="ignore", invalid="ignore"):
self.plotter_data_y = [
np.sum(
np.sum(data[-1].content["signals"]["data"] * self._current_norm, axis=1)
/ np.sum(self._current_norm, axis=0),
axis=0,
).squeeze()
]
self.update_signal.emit()
@pyqtSlot(dict, dict)
def on_dap_update(self, data: dict, metadata: dict):
self.img.setImage(data["z"])
# time.sleep(0,1)
@pyqtSlot(dict)
def new_proj(self, data):
proj_nr = data["proj_nr"]
endpoint = f"px_stream/projection_{proj_nr}/metadata"
msg_raw = client.producer.get(topic=endpoint)
msg = BECMessage.DeviceMessage.loads(msg_raw)
self._current_q = msg.content["signals"]["q"]
self._current_norm = msg.content["signals"]["norm_sum"]
self._current_metadata = msg.content["signals"]["metadata"]
self.plotter_data_x = [self._current_q]
self._current_proj = proj_nr
if __name__ == "__main__":
import argparse
from bec_widgets import ctrl_c
from bec_widgets.bec_dispatcher import bec_dispatcher
parser = argparse.ArgumentParser()
parser.add_argument(
"--signals",
help="specify recorded signals",
nargs="+",
default=["gauss_bpm"],
)
# default = ["gauss_bpm", "bpm4i", "bpm5i", "bpm6i", "xert"],
value = parser.parse_args()
print(f"Plotting signals for: {', '.join(value.signals)}")
client = bec_dispatcher.client
# client.start()
app = QtWidgets.QApplication([])
ctrl_c.setup(app)
plot = BasicPlot(y_value_list=value.signals)
# bec_dispatcher.connect(plot)
bec_dispatcher.connect_proj_id(plot.new_proj)
bec_dispatcher.connect_dap_slot(plot.on_dap_update, "px_dap_worker")
plot.show()
# client.callbacks.register("scan_segment", plot, sync=False)
app.exec_()

View File

@@ -1,80 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>845</width>
<height>635</height>
</rect>
</property>
<property name="windowTitle">
<string>Line Plot</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QPushButton" name="pushButton_generate">
<property name="text">
<string>Generate 1D and 2D data without stream</string>
</property>
</widget>
</item>
<item>
<widget class="QSplitter" name="splitter_H">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<widget class="QSplitter" name="splitter_plot">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="orientation">
<enum>Qt::Vertical</enum>
</property>
<widget class="GraphicsLayoutWidget" name="glw_plot"/>
<widget class="GraphicsLayoutWidget" name="glw_image"/>
</widget>
<widget class="QTableWidget" name="cursor_table">
<property name="textElideMode">
<enum>Qt::ElideMiddle</enum>
</property>
<column>
<property name="text">
<string>Display</string>
</property>
</column>
<column>
<property name="text">
<string>Device</string>
</property>
</column>
<column>
<property name="text">
<string>X</string>
</property>
</column>
<column>
<property name="text">
<string>Y</string>
</property>
</column>
</widget>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>GraphicsLayoutWidget</class>
<extends>QGraphicsView</extends>
<header>pyqtgraph.h</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,360 +0,0 @@
import os
import warnings
from typing import Any
import numpy as np
import pyqtgraph
import pyqtgraph as pg
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QTableWidgetItem, QCheckBox
from bec_lib import BECClient
from pyqtgraph import mkBrush, mkColor, mkPen
from pyqtgraph.Qt import QtCore, QtWidgets, uic
from pyqtgraph.Qt.QtCore import pyqtSignal
class BasicPlot(QtWidgets.QWidget):
update_signal = pyqtSignal()
roi_signal = pyqtSignal(tuple)
def __init__(self, name="", y_value_list=["gauss_bpm"]) -> None:
"""
Basic plot widget for displaying scan data.
Args:
name (str, optional): Name of the plot. Defaults to "".
y_value_list (list, optional): List of signals to be plotted. Defaults to ["gauss_bpm"].
"""
super(BasicPlot, self).__init__()
# Set style for pyqtgraph plots
pg.setConfigOption("background", "w")
pg.setConfigOption("foreground", "k")
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "line_plot.ui"), self)
# Set splitter distribution of widgets
self.splitter.setSizes([5, 2])
self._idle_time = 100
self.title = ""
self.label_bottom = ""
self.label_left = ""
self.scan_motors = []
self.y_value_list = y_value_list
self.previous_y_value_list = None
self.plotter_data_x = []
self.plotter_data_y = []
self.curves = []
self.pens = []
self.brushs = []
self.plotter_scan_id = None
# TODO to be moved to utils function
plotstyles = {
"symbol": "o",
"symbolSize": 10,
}
color_list = ["#384c6b", "#e28a2b", "#5E3023", "#e41a1c", "#984e83", "#4daf4a"]
color_list = BasicPlot.golden_angle_color(colormap="CET-R2", num=len(self.y_value_list))
# setup plots - GraphicsLayoutWidget
# LabelItem
self.label = pg.LabelItem(justify="center")
self.glw.addItem(self.label)
self.label.setText("test label")
# PlotItem - main window
self.glw.nextRow()
self.plot = pg.PlotItem()
self.glw.addItem(self.plot)
self.plot.addLegend()
# PlotItem - ROI window - disabled for now #TODO add 2D plot for ROI and 1D plot for mouse click
# self.glw.nextRow()
# self.plot_roi = pg.PlotItem()
# self.glw.addItem(self.plot_roi)
# ROI selector - so far from [-1,1] #TODO update to scale with xrange
self.roi_selector = pg.LinearRegionItem([-1, 1])
for ii, y_value in enumerate(self.y_value_list):
pen = mkPen(color=color_list[ii], width=2, style=QtCore.Qt.DashLine)
brush = mkBrush(color=color_list[ii])
curve = pg.PlotDataItem(
**plotstyles, symbolBrush=brush, pen=pen, skipFiniteCheck=True, name=y_value
)
self.plot.addItem(curve)
self.curves.append(curve)
self.pens.append(pen)
self.brushs.append(brush)
self.crosshair_v = pg.InfiniteLine(angle=90, movable=False)
self.crosshair_h = pg.InfiniteLine(angle=0, movable=False)
self.plot.addItem(self.crosshair_v, ignoreBounds=True)
self.plot.addItem(self.crosshair_h, ignoreBounds=True)
# Add textItems
self.add_text_items()
# Manage signals
self.proxy = pg.SignalProxy(
self.plot.scene().sigMouseMoved, rateLimit=60, slot=self.mouse_moved
)
self.proxy_update = pg.SignalProxy(self.update_signal, rateLimit=25, slot=self.update)
self.roi_selector.sigRegionChangeFinished.connect(self.get_roi_region)
self.pushButton_debug.clicked.connect(self.debug)
def debug(self):
"""
Debug button just for quick testing
"""
def get_roi_region(self):
"""For testing purpose now, get roi region and print it to self.label as tuple"""
region = self.roi_selector.getRegion()
self.label.setText(f"x = {region[0]:.4f}, y ={region[1]:.4f}")
self.roi_signal.emit(region)
def add_text_items(self): # TODO probably can be removed
"""Add text items to the plot"""
# self.mouse_box_data.setText("Mouse cursor")
# TODO Via StyleSheet, one may set the color of the full QLabel
# self.mouse_box_data.setStyleSheet(f"QLabel {{color : rgba{self.pens[0].color().getRgb()}}}")
def mouse_moved(self, event: tuple) -> None:
"""
Update the mouse table with the current mouse position and the corresponding data.
Args:
event (tuple): Mouse event containing the position of the mouse cursor.
The position is stored in first entry as horizontal, vertical pixel.
"""
pos = event[0]
if not self.plot.sceneBoundingRect().contains(pos):
return
mousePoint = self.plot.vb.mapSceneToView(pos)
self.crosshair_v.setPos(mousePoint.x())
self.crosshair_h.setPos(mousePoint.y())
if not self.plotter_data_x:
return
for ii, y_value in enumerate(self.y_value_list):
closest_point = self.closest_x_y_value(
mousePoint.x(), self.plotter_data_x, self.plotter_data_y[ii]
)
# TODO fix text wobble in plot, see plot when it crosses 0
x_data = f"{closest_point[0]:.{self.precision}f}"
y_data = f"{closest_point[1]:.{self.precision}f}"
# Write coordinate to QTable
self.mouse_table.setItem(ii, 1, QTableWidgetItem(str(y_value)))
self.mouse_table.setItem(ii, 2, QTableWidgetItem(str(x_data)))
self.mouse_table.setItem(ii, 3, QTableWidgetItem(str(y_data)))
self.mouse_table.resizeColumnsToContents()
def closest_x_y_value(self, input_value, list_x, list_y) -> tuple:
"""
Find the closest x and y value to the input value.
Args:
input_value (float): Input value
list_x (list): List of x values
list_y (list): List of y values
Returns:
tuple: Closest x and y value
"""
arr = np.asarray(list_x)
i = (np.abs(arr - input_value)).argmin()
return list_x[i], list_y[i]
def update(self):
"""Update the plot with the new data."""
# check if roi selector is in the plot
if self.roi_selector not in self.plot.items:
self.plot.addItem(self.roi_selector)
# check if QTable was initialised and if list of devices was changed
if self.y_value_list != self.previous_y_value_list:
self.setup_cursor_table()
self.previous_y_value_list = self.y_value_list.copy() if self.y_value_list else None
if len(self.plotter_data_x) <= 1:
return
self.plot.setLabel("bottom", self.label_bottom)
self.plot.setLabel("left", self.label_left)
for ii in range(len(self.y_value_list)):
self.curves[ii].setData(self.plotter_data_x, self.plotter_data_y[ii])
@pyqtSlot(dict, dict)
def on_scan_segment(self, data: dict, metadata: dict) -> None:
"""Update function that is called during the scan callback. To avoid
too many renderings, the GUI is only processing events every <_idle_time> ms.
Args:
data (dict): Dictionary containing a new scan segment
metadata (dict): Scan metadata
"""
if metadata["scanID"] != self.plotter_scan_id:
self.plotter_scan_id = metadata["scanID"]
self._reset_plot_data()
self.title = f"Scan {metadata['scan_number']}"
self.scan_motors = scan_motors = metadata.get("scan_report_devices")
# client = BECClient()
remove_y_value_index = [
index
for index, y_value in enumerate(self.y_value_list)
if y_value not in client.device_manager.devices
]
if remove_y_value_index:
for ii in sorted(remove_y_value_index, reverse=True):
# TODO Use bec warning message??? to be discussed with Klaus
warnings.warn(
f"Warning: no matching signal for {self.y_value_list[ii]} found in list of devices. Removing from plot."
)
self.remove_curve_by_name(self.plot, self.y_value_list[ii])
self.y_value_list.pop(ii)
self.precision = client.device_manager.devices[scan_motors[0]]._info["describe"][
scan_motors[0]
]["precision"]
# TODO after update of bec_lib, this will be new way to access data
# self.precision = client.device_manager.devices[scan_motors[0]].precision
x = data["data"][scan_motors[0]][scan_motors[0]]["value"]
self.plotter_data_x.append(x)
for ii, y_value in enumerate(self.y_value_list):
y = data["data"][y_value][y_value]["value"]
self.plotter_data_y[ii].append(y)
self.label_bottom = scan_motors[0]
self.label_left = f"{', '.join(self.y_value_list)}"
# print(f'metadata scan N{metadata["scan_number"]}') #TODO put as label on top of plot
# print(f'Data point = {data["point_id"]}') #TODO can be used for progress bar
if len(self.plotter_data_x) <= 1:
return
self.update_signal.emit()
def _reset_plot_data(self):
"""Reset the plot data."""
self.plotter_data_x = []
self.plotter_data_y = []
for ii in range(len(self.y_value_list)):
self.curves[ii].setData([], [])
self.plotter_data_y.append([])
def setup_cursor_table(self):
"""QTable formatting according to N of devices displayed in plot."""
# Init number of rows in table according to n of devices
self.mouse_table.setRowCount(len(self.y_value_list))
for ii, y_value in enumerate(self.y_value_list):
checkbox = QCheckBox()
checkbox.setChecked(True)
# TODO just for testing, will be replaced by removing/adding curve
checkbox.stateChanged.connect(lambda: print("status Changed"))
# checkbox.stateChanged.connect(lambda: self.remove_curve_by_name(plot=self.plot, checkbox=checkbox, name=y_value))
self.mouse_table.setCellWidget(ii, 0, checkbox)
self.mouse_table.setItem(ii, 1, QTableWidgetItem(str(y_value)))
self.mouse_table.resizeColumnsToContents()
@staticmethod
def remove_curve_by_name(plot: pyqtgraph.PlotItem, name: str) -> None:
# def remove_curve_by_name(plot: pyqtgraph.PlotItem, checkbox: QtWidgets.QCheckBox, name: str) -> None:
"""Removes a curve from the given plot by the specified name.
Args:
plot (pyqtgraph.PlotItem): The plot from which to remove the curve.
name (str): The name of the curve to remove.
"""
# if checkbox.isChecked():
for item in plot.items:
if isinstance(item, pg.PlotDataItem) and getattr(item, "opts", {}).get("name") == name:
plot.removeItem(item)
return
# else:
# return
@staticmethod
def golden_ratio(num: int) -> list:
"""Calculate the golden ratio for a given number of angles.
Args:
num (int): Number of angles
"""
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
angles = []
for ii in range(num):
x = np.cos(ii * phi)
y = np.sin(ii * phi)
angle = np.arctan2(y, x)
angles.append(angle)
return angles
@staticmethod
def golden_angle_color(colormap: str, num: int) -> list:
"""
Extract num colors for from the specified colormap following golden angle distribution.
Args:
colormap (str): Name of the colormap
num (int): Number of requested colors
Returns:
list: List of colors with length <num>
Raises:
ValueError: If the number of requested colors is greater than the number of colors in the colormap.
"""
cmap = pg.colormap.get(colormap)
cmap_colors = cmap.color
if num > len(cmap_colors):
raise ValueError(
f"Number of colors requested ({num}) is greater than the number of colors in the colormap ({len(cmap_colors)})"
)
angles = BasicPlot.golden_ratio(len(cmap_colors))
color_selection = np.round(np.interp(angles, (-np.pi, np.pi), (0, len(cmap_colors))))
colors = [
mkColor(tuple((cmap_colors[int(ii)] * 255).astype(int))) for ii in color_selection[:num]
]
return colors
if __name__ == "__main__":
import argparse
from bec_widgets.bec_dispatcher import bec_dispatcher
from bec_widgets import ctrl_c
parser = argparse.ArgumentParser()
parser.add_argument(
"--signals",
help="specify recorded signals",
nargs="+",
default=["gauss_bpm", "bpm4i", "bpm5i", "bpm6i", "xert"],
)
value = parser.parse_args()
print(f"Plotting signals for: {', '.join(value.signals)}")
client = bec_dispatcher.client
# client.start()
app = QtWidgets.QApplication([])
ctrl_c.setup(app)
plot = BasicPlot(y_value_list=value.signals)
bec_dispatcher.connect(plot)
plot.show()
# client.callbacks.register("scan_segment", plot, sync=False)
app.exec_()

View File

@@ -1 +0,0 @@
from .crosshair import Crosshair

View File

@@ -1,115 +0,0 @@
import numpy as np
import pyqtgraph as pg
from PyQt5.QtCore import QObject, pyqtSignal
class Crosshair(QObject):
# Signal for 1D plot
coordinatesChanged1D = pyqtSignal(float, list)
coordinatesClicked1D = pyqtSignal(float, list)
# Signal for 2D plot
coordinatesChanged2D = pyqtSignal(float, float)
coordinatesClicked2D = pyqtSignal(float, float)
def __init__(self, plot_item, precision=None, parent=None):
super().__init__(parent)
self.plot_item = plot_item
self.precision = precision
self.v_line = pg.InfiniteLine(angle=90, movable=False)
self.h_line = pg.InfiniteLine(angle=0, movable=False)
self.plot_item.addItem(self.v_line, ignoreBounds=True)
self.plot_item.addItem(self.h_line, ignoreBounds=True)
self.proxy = pg.SignalProxy(
self.plot_item.scene().sigMouseMoved, rateLimit=60, slot=self.mouse_moved
)
self.plot_item.scene().sigMouseClicked.connect(self.mouse_clicked)
# Add marker for clicked and selected point
data = self.get_data()
if isinstance(data, list): # 1D plot
num_curves = len(data)
self.marker_moved_1d = []
self.marker_clicked_1d = []
for i in range(num_curves):
color = plot_item.listDataItems()[i].opts["pen"].color()
marker_moved = pg.ScatterPlotItem(
size=10, pen=pg.mkPen(color), brush=pg.mkBrush(None)
) # Hollow
marker_clicked = pg.ScatterPlotItem(
size=10, pen=pg.mkPen(None), brush=pg.mkBrush(color)
) # Full
self.marker_moved_1d.append(marker_moved)
self.marker_clicked_1d.append(marker_clicked)
self.plot_item.addItem(marker_moved)
self.plot_item.addItem(marker_clicked)
else: # 2D plot
self.marker_2d = pg.ROI([0, 0], size=[1, 1], pen=pg.mkPen("r", width=2), movable=False)
self.plot_item.addItem(self.marker_2d)
def get_data(self):
curves = []
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem): # 1D plot
curves.append((item.xData, item.yData))
elif isinstance(item, pg.ImageItem): # 2D plot
return item.image, None
return curves
def snap_to_data(self, x, y):
data = self.get_data()
if isinstance(data, list): # 1D plot
y_values = []
for x_data, y_data in data:
closest_x, closest_y = self.closest_x_y_value(x, x_data, y_data)
y_values.append(closest_y)
if self.precision is not None:
x = round(x, self.precision)
y_values = [round(y_val, self.precision) for y_val in y_values]
return x, y_values
elif isinstance(data[0], np.ndarray): # 2D plot
x_idx = int(np.clip(x, 0, data[0].shape[0] - 1))
y_idx = int(np.clip(y, 0, data[0].shape[1] - 1))
return x_idx, y_idx
return x, y
def closest_x_y_value(self, input_value, list_x, list_y):
"""
Find the closest x and y value to the input value.
Args:
input_value (float): Input value
list_x (list): List of x values
list_y (list): List of y values
Returns:
tuple: Closest x and y value
"""
arr = np.asarray(list_x)
i = (np.abs(arr - input_value)).argmin()
return list_x[i], list_y[i]
def mouse_moved(self, event):
pos = event[0]
if self.plot_item.vb.sceneBoundingRect().contains(pos):
mouse_point = self.plot_item.vb.mapSceneToView(pos)
x, y_values = self.snap_to_data(mouse_point.x(), mouse_point.y())
self.v_line.setPos(mouse_point.x())
self.h_line.setPos(mouse_point.y())
if isinstance(y_values, list): # 1D plot
self.coordinatesChanged1D.emit(x, y_values)
for i, y_val in enumerate(y_values):
self.marker_moved_1d[i].setData([x], [y_val])
else: # 2D plot
self.coordinatesChanged2D.emit(x, y_values)
def mouse_clicked(self, event):
if self.plot_item.vb.sceneBoundingRect().contains(event._scenePos):
mouse_point = self.plot_item.vb.mapSceneToView(event._scenePos)
x, y_values = self.snap_to_data(mouse_point.x(), mouse_point.y())
if isinstance(y_values, list): # 1D plot
self.coordinatesClicked1D.emit(x, y_values)
for i, y_val in enumerate(y_values):
self.marker_clicked_1d[i].setData([x], [y_val])
else: # 2D plot
self.coordinatesClicked2D.emit(x, y_values)
self.marker_2d.setPos([x, y_values])

View File

@@ -1,143 +0,0 @@
import numpy as np
import pyqtgraph as pg
from PyQt5.QtWidgets import (
QApplication,
QVBoxLayout,
QLabel,
QWidget,
QHBoxLayout,
QTableWidget,
QTableWidgetItem,
)
from pyqtgraph import mkPen
from pyqtgraph.Qt import QtCore
from crosshair import Crosshair
class ExampleApp(QWidget):
def __init__(self):
super().__init__()
# Layout
self.layout = QHBoxLayout()
self.setLayout(self.layout)
##########################
# 1D Plot
##########################
# PlotWidget
self.plot_widget_1d = pg.PlotWidget(title="1D PlotWidget with multiple curves")
self.plot_item_1d = self.plot_widget_1d.getPlotItem()
self.plot_item_1d.setLogMode(True, False)
# 1D Datasets
self.x_data = np.linspace(0, 10, 1000)
def gauss(x, mu, sigma):
return (1 / (sigma * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((x - mu) / sigma) ** 2)
mu = 1 # mean
sigma = 1 # standard deviation
# same convention as in line_plot.py
self.y_value_list = [
np.sin(self.x_data),
np.cos(self.x_data),
np.sin(2 * self.x_data),
] # List of y-values for multiple curves
self.y_value_list = [gauss(self.x_data, mu, sigma)]
self.curve_names = ["Gauss"] # ,"Sine", "Cosine", "Sine2x"]
# Curves
color_list = ["#384c6b", "#e28a2b", "#5E3023", "#e41a1c", "#984e83", "#4daf4a"]
self.plot_item_1d.addLegend()
self.curves = []
for ii, y_value in enumerate(self.y_value_list):
pen = mkPen(color=color_list[ii], width=2, style=QtCore.Qt.DashLine)
curve = pg.PlotDataItem(
self.x_data, y_value, pen=pen, skipFiniteCheck=True, name=self.curve_names[ii]
)
self.plot_item_1d.addItem(curve)
self.curves.append(curve)
##########################
# 2D Plot
##########################
self.plot_widget_2d = pg.PlotWidget(title="2D plot with crosshair and ROI square")
self.data_2D = np.random.random((100, 100))
self.plot_item_2d = self.plot_widget_2d.getPlotItem()
self.image_item = pg.ImageItem(self.data_2D)
self.plot_item_2d.addItem(self.image_item)
##########################
# Table
##########################
self.table = QTableWidget(len(self.curve_names), 2)
self.table.setHorizontalHeaderLabels(["(X, Y) - Moved", "(X, Y) - Clicked"])
self.table.setVerticalHeaderLabels(self.curve_names)
self.table.resizeColumnsToContents()
##########################
# Signals & Cross-hairs
##########################
# 1D
self.crosshair_1d = Crosshair(self.plot_item_1d, precision=2)
self.crosshair_1d.coordinatesChanged1D.connect(
lambda x, y: self.update_table(self.table, x, y, column=0)
)
self.crosshair_1d.coordinatesClicked1D.connect(
lambda x, y: self.update_table(self.table, x, y, column=1)
)
# 2D
self.crosshair_2d = Crosshair(self.plot_item_2d)
self.crosshair_2d.coordinatesChanged2D.connect(
lambda x, y: self.moved_label_2d.setText(f"Mouse Moved Coordinates (2D): x={x}, y={y}")
)
self.crosshair_2d.coordinatesClicked2D.connect(
lambda x, y: self.clicked_label_2d.setText(f"Clicked Coordinates (2D): x={x}, y={y}")
)
##########################
# Adding widgets to layout
##########################
##### left side #####
self.column1 = QVBoxLayout()
self.layout.addLayout(self.column1)
# label
self.clicked_label_1d = QLabel("Clicked Coordinates (1D):")
self.column1.addWidget(self.clicked_label_1d)
# table
self.column1.addWidget(self.table)
# 1D plot
self.column1.addWidget(self.plot_widget_1d)
##### left side #####
self.column2 = QVBoxLayout()
self.layout.addLayout(self.column2)
# labels
self.clicked_label_2d = QLabel("Clicked Coordinates (2D):")
self.moved_label_2d = QLabel("Moved Coordinates (2D):")
self.column2.addWidget(self.clicked_label_2d)
self.column2.addWidget(self.moved_label_2d)
# 2D plot
self.column2.addWidget(self.plot_widget_2d)
def update_table(self, table_widget, x, y_values, column):
for i, y in enumerate(y_values):
table_widget.setItem(i, column, QTableWidgetItem(f"({x}, {y})"))
table_widget.resizeColumnsToContents()
if __name__ == "__main__":
app = QApplication([])
window = ExampleApp()
window.show()
app.exec_()

View File

@@ -1,56 +0,0 @@
from PyQt5.QtDesigner import QPyDesignerCustomWidgetPlugin
from PyQt5.QtGui import QIcon
from bec_widgets.scan2d_plot import BECScanPlot2D
class BECScanPlot2DPlugin(QPyDesignerCustomWidgetPlugin):
def __init__(self, parent=None):
super().__init__(parent)
self._initialized = False
def initialize(self, formEditor):
if self._initialized:
return
self._initialized = True
def isInitialized(self):
return self._initialized
def createWidget(self, parent):
return BECScanPlot2D(parent)
def name(self):
return "BECScanPlot2D"
def group(self):
return "BEC widgets"
def icon(self):
return QIcon()
def toolTip(self):
return "BEC plot for 2D scans"
def whatsThis(self):
return "BEC plot for 2D scans"
def isContainer(self):
return False
def domXml(self):
return (
'<widget class="BECScanPlot2D" name="BECScanPlot2D">\n'
' <property name="toolTip" >\n'
" <string>BEC plot for 2D scans</string>\n"
" </property>\n"
' <property name="whatsThis" >\n'
" <string>BEC plot for 2D scans in Python using PyQt.</string>\n"
" </property>\n"
"</widget>\n"
)
def includeFile(self):
return "scan2d_plot"

View File

@@ -1,12 +1,4 @@
Add/modify the path in the following variable to make the plugin avaiable in Qt Designer:
```
$ export PYQTDESIGNERPATH=/<path to repo>/bec_widgets/qtdesigner_plugins
$ export PYQTDESIGNERPATH=/<path to repo>/bec/bec_qtplugin:$PYQTDESIGNERPATH
```
It can be done when activating a conda environment (run with the corresponding env already activated):
```
$ conda env config vars set PYQTDESIGNERPATH=/<path to repo>/bec_widgets/qtdesigner_plugins
```
All the available conda-forge `pyqt >=5.15` packages don't seem to support loading Qt Designer
python plugins at the time of writing. Use `pyqt =5.12` to solve the issue for now.

View File

@@ -1,140 +0,0 @@
import numpy as np
import pyqtgraph as pg
from bec_lib.core.logger import bec_logger
from PyQt5.QtCore import pyqtProperty, pyqtSlot
from bec_widgets.bec_dispatcher import bec_dispatcher
logger = bec_logger.logger
pg.setConfigOptions(background="w", foreground="k", antialias=True)
class BECScanPlot2D(pg.GraphicsView):
def __init__(self, parent=None, background="default"):
super().__init__(parent, background)
bec_dispatcher.connect(self)
self._x_channel = ""
self._y_channel = ""
self._z_channel = ""
self._xpos = []
self._ypos = []
self._x_ind = None
self._y_ind = None
self.plot_item = pg.PlotItem()
self.setCentralItem(self.plot_item)
self.plot_item.setAspectLocked(True)
self.imageItem = pg.ImageItem()
self.plot_item.addItem(self.imageItem)
@pyqtSlot(dict, dict)
def on_new_scan(self, _scan_segment, metadata):
# TODO: Do we reset in case of a scan type change?
self.imageItem.clear()
# TODO: better to check the number of coordinates in metadata["positions"]?
if metadata["scan_name"] != "grid_scan":
return
positions = [sorted(set(pos)) for pos in zip(*metadata["positions"])]
motors = metadata["scan_motors"]
if self.x_channel and self.y_channel:
self._x_ind = motors.index(self.x_channel) if self.x_channel in motors else None
self._y_ind = motors.index(self.y_channel) if self.y_channel in motors else None
elif not self.x_channel and not self.y_channel:
# Plot the first and second motors along x and y axes respectively
self._x_ind = 0
self._y_ind = 1
else:
logger.warning(
f"X and Y channels should be either both empty or both set in {self.objectName()}"
)
if self._x_ind is None or self._y_ind is None:
return
xpos = positions[self._x_ind]
ypos = positions[self._y_ind]
self._xpos = xpos
self._ypos = ypos
self.imageItem.setImage(np.zeros(shape=(len(xpos), len(ypos))))
w = max(xpos) - min(xpos)
h = max(ypos) - min(ypos)
w_pix = w / (len(xpos) - 1)
h_pix = h / (len(ypos) - 1)
self.imageItem.setRect(min(xpos) - w_pix / 2, min(ypos) - h_pix / 2, w + w_pix, h + h_pix)
self.plot_item.setLabel("bottom", motors[self._x_ind])
self.plot_item.setLabel("left", motors[self._y_ind])
@pyqtSlot(dict, dict)
def on_scan_segment(self, scan_segment, metadata):
if not self.z_channel or metadata["scan_name"] != "grid_scan":
return
if self._x_ind is None or self._y_ind is None:
return
point_coord = metadata["positions"][scan_segment["point_id"]]
x_coord_ind = self._xpos.index(point_coord[self._x_ind])
y_coord_ind = self._ypos.index(point_coord[self._y_ind])
data = scan_segment["data"]
z_new = data[self.z_channel][self.z_channel]["value"]
image = self.imageItem.image
image[x_coord_ind, y_coord_ind] = z_new
self.imageItem.setImage()
@pyqtProperty(str)
def x_channel(self):
return self._x_channel
@x_channel.setter
def x_channel(self, new_val):
self._x_channel = new_val
self.plot_item.setLabel("bottom", new_val)
@pyqtProperty(str)
def y_channel(self):
return self._y_channel
@y_channel.setter
def y_channel(self, new_val):
self._y_channel = new_val
self.plot_item.setLabel("left", new_val)
@pyqtProperty(str)
def z_channel(self):
return self._z_channel
@z_channel.setter
def z_channel(self, new_val):
self._z_channel = new_val
if __name__ == "__main__":
import sys
from PyQt5.QtWidgets import QApplication
app = QApplication(sys.argv)
plot = BECScanPlot2D()
# If x_channel and y_channel are both omitted, they will be inferred from each running grid scan
plot.z_channel = "bpm3y"
plot.show()
sys.exit(app.exec_())

View File

@@ -1,10 +1,9 @@
import itertools
import pyqtgraph as pg
from bec_lib.core.logger import bec_logger
from PyQt5.QtCore import pyqtProperty, pyqtSlot
from bec_widgets.bec_dispatcher import bec_dispatcher
from bec_lib.core.logger import bec_logger
logger = bec_logger.logger
@@ -13,13 +12,9 @@ pg.setConfigOptions(background="w", foreground="k", antialias=True)
COLORS = ["#fd7f6f", "#7eb0d5", "#b2e061", "#bd7ebe", "#ffb55a"]
class BECScanPlot(pg.GraphicsView):
class BECScanPlot(pg.PlotWidget):
def __init__(self, parent=None, background="default"):
super().__init__(parent, background)
bec_dispatcher.connect(self)
self.view = pg.PlotItem()
self.setCentralItem(self.view)
self._x_channel = ""
self._y_channel_list = []
@@ -27,18 +22,36 @@ class BECScanPlot(pg.GraphicsView):
self.scan_curves = {}
self.dap_curves = {}
@pyqtSlot(dict, dict)
def on_new_scan(self, _scan_segment, _metadata):
def initialize(self):
plot_item = self.getPlotItem()
plot_item.addLegend()
colors = itertools.cycle(COLORS)
for y_chan in self.y_channel_list:
if y_chan.startswith("dap."):
y_chan = y_chan.partition("dap.")[-1]
curves = self.dap_curves
else:
curves = self.scan_curves
curves[y_chan] = plot_item.plot(
x=[], y=[], pen=pg.mkPen(color=next(colors), width=2), name=y_chan
)
plot_item.setLabel("bottom", self._x_channel)
if len(self.scan_curves) == 1:
plot_item.setLabel("left", next(iter(self.scan_curves)))
@pyqtSlot()
def clearData(self):
for plot_curve in {**self.scan_curves, **self.dap_curves}.values():
plot_curve.setData(x=[], y=[])
@pyqtSlot(dict, dict)
def on_scan_segment(self, scan_segment, _metadata):
@pyqtSlot(dict)
def redraw_scan(self, data):
if not self.x_channel:
return
data = scan_segment["data"]
if self.x_channel not in data:
logger.warning(f"Unknown channel `{self.x_channel}` for X data in {self.objectName()}")
return
@@ -61,8 +74,8 @@ class BECScanPlot(pg.GraphicsView):
plot_curve.setData(x=[*x, x_new], y=[*y, y_new])
@pyqtSlot(dict, dict)
def redraw_dap(self, data, _metadata):
@pyqtSlot(dict)
def redraw_dap(self, data):
for chan, plot_curve in self.dap_curves.items():
if not chan:
continue
@@ -82,35 +95,8 @@ class BECScanPlot(pg.GraphicsView):
@y_channel_list.setter
def y_channel_list(self, new_list):
# TODO: do we want to care about dap/not dap here?
chan_removed = [chan for chan in self._y_channel_list if chan not in new_list]
if chan_removed and chan_removed[0].startswith("dap."):
chan_removed = chan_removed[0].partition("dap.")[-1]
bec_dispatcher.disconnect_dap_slot(self.redraw_dap, chan_removed)
self._y_channel_list = new_list
# Prepare plot for a potentially different list of y channels
self.view.clear()
self.view.addLegend()
colors = itertools.cycle(COLORS)
for y_chan in new_list:
if y_chan.startswith("dap."):
y_chan = y_chan.partition("dap.")[-1]
curves = self.dap_curves
bec_dispatcher.connect_dap_slot(self.redraw_dap, y_chan)
else:
curves = self.scan_curves
curves[y_chan] = self.view.plot(
x=[], y=[], pen=pg.mkPen(color=next(colors), width=2), name=y_chan
)
if len(new_list) == 1:
self.view.setLabel("left", new_list[0])
@pyqtProperty(str)
def x_channel(self):
return self._x_channel
@@ -118,20 +104,3 @@ class BECScanPlot(pg.GraphicsView):
@x_channel.setter
def x_channel(self, new_val):
self._x_channel = new_val
self.view.setLabel("bottom", new_val)
if __name__ == "__main__":
import sys
from PyQt5.QtWidgets import QApplication
app = QApplication(sys.argv)
plot = BECScanPlot()
plot.x_channel = "samx"
plot.y_channel_list = ["bpm3y", "bpm6y"]
plot.show()
sys.exit(app.exec_())

View File

@@ -1,12 +1,11 @@
from PyQt5.QtDesigner import QPyDesignerCustomWidgetPlugin
from PyQt5.QtGui import QIcon
from bec_widgets.scan_plot import BECScanPlot
from scan_plot import BECScanPlot
class BECScanPlotPlugin(QPyDesignerCustomWidgetPlugin):
def __init__(self, parent=None):
super().__init__(parent)
super(BECScanPlotPlugin, self).__init__(parent)
self._initialized = False

View File

@@ -1,11 +1,11 @@
[metadata]
name = bec_widgets
description = BEC Widgets
name = bec_lib
description = BEC library
long_description = file: README.md
long_description_content_type = text/markdown
url = https://gitlab.psi.ch/bec/bec-widgets
url = https://gitlab.psi.ch/bec/bec
project_urls =
Bug Tracker = https://gitlab.psi.ch/bec/bec-widgets/issues
Bug Tracker = https://gitlab.psi.ch/bec/bec/issues
classifiers =
Programming Language :: Python :: 3
Development Status :: 3 - Alpha

View File

@@ -1,10 +1,13 @@
from setuptools import setup
__version__ = "0.6.0"
__version__ = "0.2.0"
if __name__ == "__main__":
setup(
install_requires=["pyqt5", "pyqtgraph", "bec_lib"],
extras_require={"dev": ["pytest", "pytest-random-order", "coverage", "pytest-qt", "black"]},
install_requires=[
"pyqt5",
"pyqtgraph",
],
extras_require={"dev": ["pytest", "pytest-random-order", "coverage", "pytest-qt"]},
version=__version__,
)

View File

@@ -1,152 +0,0 @@
from unittest import mock
import numpy as np
from pytestqt import qtbot
from bec_widgets import basic_plot
def test_basic_plot_emits_no_signal(qtbot):
"""Test LinePlot emits no signal when only one data entry is present."""
y_value_list = ["y1", "y2"]
plot = basic_plot.BasicPlot(y_value_list=y_value_list)
data = {
"data": {
"x": {"x": {"value": 1}},
"y1": {"y1": {"value": 1}},
"y2": {"y2": {"value": 3}},
}
}
metadata = {"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]}
with mock.patch("bec_widgets.basic_plot.client") as mock_client:
with mock.patch.object(plot, "update_signal") as mock_update_signal:
plot.on_scan_segment(data=data, metadata=metadata)
mock_update_signal.emit.assert_not_called()
def test_basic_plot_emits_signal(qtbot):
"""Test LinePlot emits signal."""
y_value_list = ["y1", "y2"]
plot = basic_plot.BasicPlot(y_value_list=y_value_list)
data = {
"data": {
"x": {"x": {"value": 1}},
"y1": {"y1": {"value": 1}},
"y2": {"y2": {"value": 3}},
}
}
plotter_data_y = [[1, 1], [3, 3]]
metadata = {"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]}
with mock.patch("bec_widgets.basic_plot.client") as mock_client:
# mock_client.device_manager.devices.keys.return_value = ["y1"]
with mock.patch.object(plot, "update_signal") as mock_update_signal:
mock_update_signal.emit()
plot.on_scan_segment(data=data, metadata=metadata)
plot.on_scan_segment(data=data, metadata=metadata)
mock_update_signal.emit.assert_called()
# TODO allow mock_client to create return values for device_manager_devices
# assert plot.plotter_data_y == plotter_data_y
def test_basic_plot_raise_warning_wrong_signal_request(qtbot):
"""Test LinePlot raises warning and skips signal when entry not present in data."""
y_value_list = ["y1", "y22"]
plot = basic_plot.BasicPlot(y_value_list=y_value_list)
data = {
"data": {
"x": {"x": {"value": [1, 2, 3, 4, 5]}},
"y1": {"y1": {"value": [1, 2, 3, 4, 5]}},
"y2": {"y2": {"value": [1, 2, 3, 4, 5]}},
}
}
metadata = {"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]}
with mock.patch("bec_widgets.basic_plot.client") as mock_client:
# TODO fix mock_client
mock_dict = {"y1": [1, 2]}
mock_client.device_manager.devices.__contains__.side_effect = mock_dict.__contains__
# = {"y1": [1, 2]}
with mock.patch.object(plot, "update_signal") as mock_update_signal:
mock_update_signal.emit()
plot.on_scan_segment(data=data, metadata=metadata)
assert plot.y_value_list == ["y1"]
# def test_basic_plot_update(qtbot):
# """Test LinePlot update."""
# y_value_list = ["y1", "y2"]
# plot = basic_plot.BasicPlot(y_value_list=y_value_list)
# plot.label_bottom = "x"
# plot.label_left = f"{', '.join(y_value_list)}"
# plot.plotter_data_x = [1, 2, 3, 4, 5]
# plot.plotter_data_y = [[1, 2, 3, 4, 5], [3, 4, 5, 6, 7]]
# plot.update()
# assert all(plot.curves[0].getData()[0] == np.array([1, 2, 3, 4, 5]))
# assert all(plot.curves[0].getData()[1] == np.array([1, 2, 3, 4, 5]))
# assert all(plot.curves[1].getData()[1] == np.array([3, 4, 5, 6, 7]))
# # TODO Outputting the wrong data, e.g. motor is not in list of devices
# def test_basic_plot_update(qtbot):
# """Test LinePlot update."""
# y_value_list = ["y1", "y2"]
# plot = basic_plot.BasicPlot(y_value_list=y_value_list)
# plot.label_bottom = "x"
# plot.label_left = f"{', '.join(y_value_list)}"
# plot.plotter_data_x = [1, 2, 3, 4, 5]
# plot.plotter_data_y = [[1, 2, 3, 4, 5], [3, 4, 5, 6, 7]]
# plot.update()
# assert all(plot.curves[0].getData()[0] == np.array([1, 2, 3, 4, 5]))
# assert all(plot.curves[0].getData()[1] == np.array([1, 2, 3, 4, 5]))
# assert all(plot.curves[1].getData()[1] == np.array([3, 4, 5, 6, 7]))
# def test_basic_plot_mouse_moved(qtbot):
# """Test LinePlot mouse_moved."""
# y_value_list = ["y1", "y2"]
# plot = basic_plot.BasicPlot(y_value_list=y_value_list)
# plot.plotter_data_x = [1, 2, 3, 4, 5]
# plot.plotter_data_y = [[1, 2, 3, 4, 5], [3, 4, 5, 6, 7]]
# plot.precision = 3
# string_cap = 10
# x_data = f"{3:.{plot.precision}f}"
# y_data = f"{3:.{plot.precision}f}"
# output_string = "".join(
# [
# "Mouse cursor",
# "\n",
# f"{y_value_list[0]}",
# "\n",
# f"X_data: {x_data:>{string_cap}}",
# "\n",
# f"Y_data: {y_data:>{string_cap}}",
# ]
# )
# x_data = f"{3:.{plot.precision}f}"
# y_data = f"{5:.{plot.precision}f}"
# output_string = "".join(
# [
# output_string,
# "\n",
# f"{y_value_list[1]}",
# "\n",
# f"X_data: {x_data:>{string_cap}}",
# "\n",
# f"Y_data: {y_data:>{string_cap}}",
# ]
# )
# with mock.patch.object(
# plot, "plot"
# ) as mock_plot: # TODO change test to simulate QTable instead of QLabel
# mock_plot.sceneBoundingRect.contains.return_value = True
# mock_plot.vb.mapSceneToView((20, 10)).x.return_value = 2.8
# plot.mouse_moved((20, 10))
# assert plot.mouse_box_data.text() == output_string

View File

@@ -1,89 +0,0 @@
from pytestqt import qtbot
from bec_widgets import scan_plot
def test_scan_plot(qtbot):
"""Test ScanPlot"""
plot = scan_plot.BECScanPlot()
qtbot.addWidget(plot)
plot.show()
qtbot.waitExposed(plot)
plot.x_channel = "x"
plot.y_channel_list = ["y1", "y2"]
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 1}},
"y1": {"y1": {"value": 1}},
"y2": {"y2": {"value": 3}},
}
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 2}},
"y1": {"y1": {"value": 2}},
"y2": {"y2": {"value": 4}},
}
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
assert all(plot.scan_curves["y1"].getData()[0] == [1, 2])
assert all(plot.scan_curves["y2"].getData()[1] == [3, 4])
def test_scan_plot_clears_data(qtbot):
"""Test ScanPlot"""
plot = scan_plot.BECScanPlot()
qtbot.addWidget(plot)
plot.show()
qtbot.waitExposed(plot)
plot.x_channel = "x"
plot.y_channel_list = ["y1", "y2"]
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 1}},
"y1": {"y1": {"value": 1}},
"y2": {"y2": {"value": 3}},
}
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
plot.on_new_scan({}, {})
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 2}},
"y1": {"y1": {"value": 2}},
"y2": {"y2": {"value": 4}},
}
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
assert all(plot.scan_curves["y1"].getData()[0] == [2])
assert all(plot.scan_curves["y2"].getData()[1] == [4])
def test_scan_plot_redraws_dap(qtbot):
"""Test ScanPlot"""
plot = scan_plot.BECScanPlot()
qtbot.addWidget(plot)
plot.show()
qtbot.waitExposed(plot)
plot.y_channel_list = ["dap.y1", "dap.y2"]
plot.redraw_dap({"y1": {"x": [1], "y": [1]}, "y2": {"x": [2], "y": [2]}}, {})
assert all(plot.dap_curves["y1"].getData()[0] == [1])
assert all(plot.dap_curves["y2"].getData()[1] == [2])