mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
refactor: applied formatter
This commit is contained in:
@ -177,7 +177,7 @@ class BECWaveform(RPCBase):
|
||||
color_map_z: "Optional[str]" = "plasma",
|
||||
label: "Optional[str]" = None,
|
||||
validate_bec: "bool" = True,
|
||||
**kwargs
|
||||
**kwargs,
|
||||
) -> "BECCurve":
|
||||
"""
|
||||
Add a curve to the plot widget from the scan segment.
|
||||
@ -205,7 +205,7 @@ class BECWaveform(RPCBase):
|
||||
y: "list | np.ndarray",
|
||||
label: "str" = None,
|
||||
color: "str" = None,
|
||||
**kwargs
|
||||
**kwargs,
|
||||
) -> "BECCurve":
|
||||
"""
|
||||
Add a custom data curve to the plot widget.
|
||||
@ -484,7 +484,7 @@ class BECFigure(RPCBase):
|
||||
row: "int" = None,
|
||||
col: "int" = None,
|
||||
config=None,
|
||||
**axis_kwargs
|
||||
**axis_kwargs,
|
||||
) -> "BECWaveform":
|
||||
"""
|
||||
Add a Waveform1D plot to the figure at the specified position.
|
||||
@ -508,7 +508,7 @@ class BECFigure(RPCBase):
|
||||
row: "int" = None,
|
||||
col: "int" = None,
|
||||
config=None,
|
||||
**axis_kwargs
|
||||
**axis_kwargs,
|
||||
) -> "BECImageShow":
|
||||
"""
|
||||
Add an image to the figure at the specified position.
|
||||
@ -536,7 +536,7 @@ class BECFigure(RPCBase):
|
||||
row: "int" = None,
|
||||
col: "int" = None,
|
||||
config=None,
|
||||
**axis_kwargs
|
||||
**axis_kwargs,
|
||||
) -> "BECMotorMap":
|
||||
"""
|
||||
Args:
|
||||
@ -566,7 +566,7 @@ class BECFigure(RPCBase):
|
||||
color_map_z: "Optional[str]" = "plasma",
|
||||
label: "Optional[str]" = None,
|
||||
validate: "bool" = True,
|
||||
**axis_kwargs
|
||||
**axis_kwargs,
|
||||
) -> "BECWaveform":
|
||||
"""
|
||||
Add a 1D waveform plot to the figure. Always access the first waveform widget in the figure.
|
||||
@ -598,7 +598,7 @@ class BECFigure(RPCBase):
|
||||
color_map: "str" = "magma",
|
||||
data: "np.ndarray" = None,
|
||||
vrange: "tuple[float, float]" = None,
|
||||
**axis_kwargs
|
||||
**axis_kwargs,
|
||||
) -> "BECImageShow":
|
||||
"""
|
||||
Add an image to the figure. Always access the first image widget in the figure.
|
||||
@ -872,7 +872,7 @@ class BECImageShow(RPCBase):
|
||||
downsample: "Optional[bool]" = True,
|
||||
opacity: "Optional[float]" = 1.0,
|
||||
vrange: "Optional[tuple[int, int]]" = None,
|
||||
**kwargs
|
||||
**kwargs,
|
||||
) -> "BECImageItem":
|
||||
"""
|
||||
None
|
||||
@ -888,7 +888,7 @@ class BECImageShow(RPCBase):
|
||||
downsample: "Optional[bool]" = True,
|
||||
opacity: "Optional[float]" = 1.0,
|
||||
vrange: "Optional[tuple[int, int]]" = None,
|
||||
**kwargs
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
None
|
||||
|
@ -5,9 +5,7 @@ from bec_widgets.widgets.figure import BECFigure
|
||||
class RPCWidgetHandler:
|
||||
"""Handler class for creating widgets from RPC messages."""
|
||||
|
||||
widget_classes = {
|
||||
"BECFigure": BECFigure,
|
||||
}
|
||||
widget_classes = {"BECFigure": BECFigure}
|
||||
|
||||
@staticmethod
|
||||
def create_widget(widget_type, **kwargs) -> BECConnector:
|
||||
|
@ -106,10 +106,7 @@ CONFIG_SCAN_MODE = {
|
||||
"sources": [
|
||||
{
|
||||
"type": "scan_segment",
|
||||
"signals": {
|
||||
"x": [{"name": "samy"}],
|
||||
"y": [{"name": "gauss_adc2"}],
|
||||
},
|
||||
"signals": {"x": [{"name": "samy"}], "y": [{"name": "gauss_adc2"}]},
|
||||
}
|
||||
],
|
||||
},
|
||||
|
@ -42,7 +42,7 @@ CONFIG_DEFAULT = {
|
||||
"x": [{"name": "samx", "entry": "samx"}],
|
||||
"y": [{"name": "samy", "entry": "samy"}],
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
@ -1068,10 +1068,7 @@ class MotorApp(QWidget):
|
||||
|
||||
# PyQtGraph Controls
|
||||
layout.addWidget(QLabel("Graph Window Controls:"))
|
||||
graph_controls = [
|
||||
("Left Drag", "Pan the view"),
|
||||
("Right Drag or Scroll", "Zoom in/out"),
|
||||
]
|
||||
graph_controls = [("Left Drag", "Pan the view"), ("Right Drag or Scroll", "Zoom in/out")]
|
||||
for action, description in graph_controls:
|
||||
layout.addWidget(QLabel(f"{action} - {description}"))
|
||||
|
||||
@ -1111,10 +1108,7 @@ class MotorControl(QThread):
|
||||
motors_selected = pyqtSignal(object, object) # Signal to emit when the motors are selected
|
||||
# progress_updated = pyqtSignal(int) #TODO Signal to emit progress percentage
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
|
||||
self.action = None
|
||||
@ -1131,10 +1125,7 @@ class MotorControl(QThread):
|
||||
self.motor_x_name = motor_x_name
|
||||
self.motor_y_name = motor_y_name
|
||||
|
||||
self.motor_x, self.motor_y = (
|
||||
dev[self.motor_x_name],
|
||||
dev[self.motor_y_name],
|
||||
)
|
||||
self.motor_x, self.motor_y = (dev[self.motor_x_name], dev[self.motor_y_name])
|
||||
|
||||
(self.current_x, self.current_y) = self.get_coordinates()
|
||||
|
||||
|
@ -112,9 +112,7 @@ class BECDispatcher:
|
||||
cls._initialized = False
|
||||
|
||||
def connect_slot(
|
||||
self,
|
||||
slot: Callable,
|
||||
topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]],
|
||||
self, slot: Callable, topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]]
|
||||
) -> None:
|
||||
"""Connect widget's pyqt slot, so that it is called on new pub/sub topic message.
|
||||
|
||||
|
@ -70,10 +70,7 @@ class BECDockArea(BECConnector, DockArea):
|
||||
self.docks = WeakValueDictionary(value)
|
||||
|
||||
def restore_state(
|
||||
self,
|
||||
state: dict = None,
|
||||
missing: Literal["ignore", "error"] = "ignore",
|
||||
extra="bottom",
|
||||
self, state: dict = None, missing: Literal["ignore", "error"] = "ignore", extra="bottom"
|
||||
):
|
||||
"""
|
||||
Restore the state of the dock area. If no state is provided, the last state is restored.
|
||||
|
@ -263,12 +263,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
)
|
||||
# User wants to add custom curve
|
||||
elif x is not None and y is not None and x_name is None and y_name is None:
|
||||
waveform.add_curve_custom(
|
||||
x=x,
|
||||
y=y,
|
||||
color=color,
|
||||
label=label,
|
||||
)
|
||||
waveform.add_curve_custom(x=x, y=y, color=color, label=label)
|
||||
|
||||
return waveform
|
||||
|
||||
@ -354,12 +349,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
elif (
|
||||
x is not None and y is not None and x_name is None and y_name is None and z_name is None
|
||||
):
|
||||
waveform.add_curve_custom(
|
||||
x=x,
|
||||
y=y,
|
||||
color=color,
|
||||
label=label,
|
||||
)
|
||||
waveform.add_curve_custom(x=x, y=y, color=color, label=label)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Invalid input. Provide either device names (x_name, y_name) or custom data."
|
||||
@ -534,9 +524,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
|
||||
widget_id = str(uuid.uuid4())
|
||||
if config is None:
|
||||
config = MotorMapConfig(
|
||||
widget_class="BECMotorMap",
|
||||
gui_id=widget_id,
|
||||
parent_id=self.gui_id,
|
||||
widget_class="BECMotorMap", gui_id=widget_id, parent_id=self.gui_id
|
||||
)
|
||||
motor_map = self.add_widget(
|
||||
widget_type="MotorMap",
|
||||
|
@ -113,10 +113,7 @@ CONFIG_SCAN_MODE = {
|
||||
"sources": [
|
||||
{
|
||||
"type": "scan_segment",
|
||||
"signals": {
|
||||
"x": [{"name": "samy"}],
|
||||
"y": [{"name": "gauss_adc2"}],
|
||||
},
|
||||
"signals": {"x": [{"name": "samy"}], "y": [{"name": "gauss_adc2"}]},
|
||||
}
|
||||
],
|
||||
},
|
||||
@ -172,12 +169,7 @@ CONFIG_SCAN_MODE = {
|
||||
class ConfigDialog(QWidget, Ui_Form):
|
||||
config_updated = pyqtSignal(dict)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
client=None,
|
||||
default_config=None,
|
||||
skip_validation: bool = False,
|
||||
):
|
||||
def __init__(self, client=None, default_config=None, skip_validation: bool = False):
|
||||
super(ConfigDialog, self).__init__()
|
||||
self.setupUi(self)
|
||||
|
||||
@ -386,15 +378,7 @@ class ConfigDialog(QWidget, Ui_Form):
|
||||
"plot_name": self.safe_text(ui.lineEdit_plot_title),
|
||||
"x_label": self.safe_text(ui.lineEdit_x_label),
|
||||
"y_label": self.safe_text(ui.lineEdit_y_label),
|
||||
"sources": [
|
||||
{
|
||||
"type": "scan_segment",
|
||||
"signals": {
|
||||
"x": x_signals,
|
||||
"y": y_signals,
|
||||
},
|
||||
}
|
||||
],
|
||||
"sources": [{"type": "scan_segment", "signals": {"x": x_signals, "y": y_signals}}],
|
||||
}
|
||||
|
||||
return plot_data
|
||||
|
@ -214,10 +214,7 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
endpoints.append(MessageEndpoints.device_readback(motor))
|
||||
|
||||
# Connect all topics to a single slot
|
||||
bec_dispatcher.connect_slot(
|
||||
self.on_device_readback,
|
||||
endpoints,
|
||||
)
|
||||
bec_dispatcher.connect_slot(self.on_device_readback, endpoints)
|
||||
|
||||
def _add_limits_to_plot_data(self):
|
||||
"""
|
||||
@ -491,11 +488,7 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
|
||||
# Update the scatter plot
|
||||
self.curves_data[plot_name]["pos"].setData(
|
||||
x=motor_x_data,
|
||||
y=motor_y_data,
|
||||
brush=brushes,
|
||||
pen=None,
|
||||
size=self.scatter_size,
|
||||
x=motor_x_data, y=motor_y_data, brush=brushes, pen=None, size=self.scatter_size
|
||||
)
|
||||
|
||||
# Get last know position for crosshair
|
||||
@ -595,11 +588,7 @@ if __name__ == "__main__": # pragma: no cover
|
||||
client = BECDispatcher().client
|
||||
client.start()
|
||||
app = QApplication(sys.argv)
|
||||
motor_map = MotorMap(
|
||||
config=config,
|
||||
gui_id=args.id,
|
||||
skip_validation=True,
|
||||
)
|
||||
motor_map = MotorMap(config=config, gui_id=args.id, skip_validation=True)
|
||||
motor_map.show()
|
||||
|
||||
sys.exit(app.exec())
|
||||
|
@ -136,10 +136,7 @@ class BECMotorMap(BECPlotBase):
|
||||
Returns:
|
||||
dict: Data of the motor map.
|
||||
"""
|
||||
data = {
|
||||
"x": self.database_buffer["x"],
|
||||
"y": self.database_buffer["y"],
|
||||
}
|
||||
data = {"x": self.database_buffer["x"], "y": self.database_buffer["y"]}
|
||||
return data
|
||||
|
||||
# TODO setup all visual properties
|
||||
@ -391,11 +388,7 @@ class BECMotorMap(BECPlotBase):
|
||||
|
||||
# Update the scatter plot
|
||||
self.plot_components["scatter"].setData(
|
||||
x=x,
|
||||
y=y,
|
||||
brush=brushes,
|
||||
pen=None,
|
||||
size=scatter_size,
|
||||
x=x, y=y, brush=brushes, pen=None, size=scatter_size
|
||||
)
|
||||
|
||||
# Get last know position for crosshair
|
||||
|
@ -438,9 +438,7 @@ if __name__ == "__main__": # pragma: no cover
|
||||
client.start()
|
||||
|
||||
app = QApplication([])
|
||||
scan_control = ScanControl(
|
||||
client=client,
|
||||
) # allowed_scans=["line_scan", "grid_scan"])
|
||||
scan_control = ScanControl(client=client) # allowed_scans=["line_scan", "grid_scan"])
|
||||
|
||||
window = scan_control
|
||||
window.show()
|
||||
|
@ -79,7 +79,4 @@ html_theme = "pydata_sphinx_theme"
|
||||
html_static_path = ["_static"]
|
||||
html_css_files = ["css/custom.css"]
|
||||
html_logo = "_static/bec.png"
|
||||
html_theme_options = {
|
||||
"show_nav_level": 1,
|
||||
"navbar_align": "content",
|
||||
}
|
||||
html_theme_options = {"show_nav_level": 1, "navbar_align": "content"}
|
||||
|
@ -41,14 +41,7 @@ def send_msg_event():
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"topics_msg_list",
|
||||
[
|
||||
(
|
||||
("topic1", dummy_msg),
|
||||
("topic2", dummy_msg),
|
||||
("topic3", dummy_msg),
|
||||
)
|
||||
],
|
||||
"topics_msg_list", [(("topic1", dummy_msg), ("topic2", dummy_msg), ("topic3", dummy_msg))]
|
||||
)
|
||||
def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot, send_msg_event):
|
||||
bec_dispatcher = bec_dispatcher_w_connector
|
||||
@ -70,15 +63,7 @@ def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot, send_msg_e
|
||||
assert len(bec_dispatcher.client.connector._topics_cb) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"topics_msg_list",
|
||||
[
|
||||
(
|
||||
("topic1", dummy_msg),
|
||||
("topic2", dummy_msg),
|
||||
)
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("topics_msg_list", [(("topic1", dummy_msg), ("topic2", dummy_msg))])
|
||||
def test_dispatcher_disconnect_one(bec_dispatcher_w_connector, qtbot, send_msg_event):
|
||||
# test for BEC issue #276
|
||||
bec_dispatcher = bec_dispatcher_w_connector
|
||||
@ -115,15 +100,7 @@ def test_dispatcher_2_cb_same_topic(bec_dispatcher_w_connector, qtbot, send_msg_
|
||||
cb2.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"topics_msg_list",
|
||||
[
|
||||
(
|
||||
("topic1", dummy_msg),
|
||||
("topic2", dummy_msg),
|
||||
)
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("topics_msg_list", [(("topic1", dummy_msg), ("topic2", dummy_msg))])
|
||||
def test_dispatcher_2_topic_same_cb(bec_dispatcher_w_connector, qtbot, send_msg_event):
|
||||
# test for BEC issue #276
|
||||
bec_dispatcher = bec_dispatcher_w_connector
|
||||
|
@ -26,11 +26,7 @@ def test_bec_figure_init(bec_figure):
|
||||
|
||||
|
||||
def test_bec_figure_init_with_config(mocked_client):
|
||||
config = {
|
||||
"widget_class": "BECFigure",
|
||||
"gui_id": "test_gui_id",
|
||||
"theme": "dark",
|
||||
}
|
||||
config = {"widget_class": "BECFigure", "gui_id": "test_gui_id", "theme": "dark"}
|
||||
widget = BECFigure(client=mocked_client, config=config)
|
||||
assert widget.config.gui_id == "test_gui_id"
|
||||
assert widget.config.theme == "dark"
|
||||
|
@ -29,10 +29,7 @@ def test_motor_map_change_motors(bec_motor_map):
|
||||
|
||||
|
||||
def test_motor_map_get_limits(bec_motor_map):
|
||||
expected_limits = {
|
||||
"samx": [-10, 10],
|
||||
"samy": [-5, 5],
|
||||
}
|
||||
expected_limits = {"samx": [-10, 10], "samy": [-5, 5]}
|
||||
|
||||
for motor_name, expected_limit in expected_limits.items():
|
||||
actual_limit = bec_motor_map._get_motor_limit(motor_name)
|
||||
|
@ -43,11 +43,7 @@ def test_load_config(config_dialog, config_name):
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"config_name, scan_mode",
|
||||
[
|
||||
("config_device", False),
|
||||
("config_scan", True),
|
||||
("config_device_no_entry", False),
|
||||
],
|
||||
[("config_device", False), ("config_scan", True), ("config_device_no_entry", False)],
|
||||
)
|
||||
def test_initialization(config_dialog, config_name, scan_mode):
|
||||
config = load_test_config(config_name)
|
||||
|
@ -50,7 +50,7 @@ CONFIG_DEFAULT = {
|
||||
"x": [{"name": "samx", "entry": "samx"}],
|
||||
"y": [{"name": "samy", "entry": "samy"}],
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ CONFIG_ONE_DEVICE = {
|
||||
"x": [{"name": "samx", "entry": "samx"}],
|
||||
"y": [{"name": "samy", "entry": "samy"}],
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
@ -73,10 +73,7 @@ def motor_map(qtbot, mocked_client):
|
||||
|
||||
def test_motor_limits_initialization(motor_map):
|
||||
# Example test to check if motor limits are correctly initialized
|
||||
expected_limits = {
|
||||
"samx": [-10, 10],
|
||||
"samy": [-5, 5],
|
||||
}
|
||||
expected_limits = {"samx": [-10, 10], "samy": [-5, 5]}
|
||||
for motor_name, expected_limit in expected_limits.items():
|
||||
actual_limit = motor_map._get_motor_limit(motor_name)
|
||||
assert actual_limit == expected_limit
|
||||
@ -99,13 +96,7 @@ def test_motor_initial_position(motor_map):
|
||||
assert actual_position == expected_position
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"config, number_of_plots",
|
||||
[
|
||||
(CONFIG_DEFAULT, 2),
|
||||
(CONFIG_ONE_DEVICE, 1),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("config, number_of_plots", [(CONFIG_DEFAULT, 2), (CONFIG_ONE_DEVICE, 1)])
|
||||
def test_initialization(motor_map, config, number_of_plots):
|
||||
config_load = config
|
||||
motor_map.on_config_update(config_load)
|
||||
@ -131,16 +122,10 @@ def test_motor_movement_updates_position_and_database(motor_map):
|
||||
motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
|
||||
|
||||
# Verify database update for 'samx'
|
||||
assert motor_map.database["samx"]["samx"] == [
|
||||
initial_position_samx,
|
||||
new_position_samx,
|
||||
]
|
||||
assert motor_map.database["samx"]["samx"] == [initial_position_samx, new_position_samx]
|
||||
|
||||
# Verify 'samy' retains its last known position
|
||||
assert motor_map.database["samy"]["samy"] == [
|
||||
initial_position_samy,
|
||||
initial_position_samy,
|
||||
]
|
||||
assert motor_map.database["samy"]["samy"] == [initial_position_samy, initial_position_samy]
|
||||
|
||||
|
||||
def test_scatter_plot_rendering(motor_map):
|
||||
|
@ -85,12 +85,7 @@ def test_1d_plotting_data(qtbot, stream_app):
|
||||
def test_flip_even_rows(qtbot, stream_app):
|
||||
# Create a numpy array with some known data
|
||||
original_array = np.array(
|
||||
[
|
||||
[1, 2, 3, 4, 5],
|
||||
[6, 7, 8, 9, 10],
|
||||
[11, 12, 13, 14, 15],
|
||||
[16, 17, 18, 19, 20],
|
||||
]
|
||||
[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15], [16, 17, 18, 19, 20]]
|
||||
)
|
||||
|
||||
# Call flip_even_rows on the original array
|
||||
@ -98,12 +93,7 @@ def test_flip_even_rows(qtbot, stream_app):
|
||||
|
||||
# Expected array flipped along the rows with even indices
|
||||
expected_array = np.array(
|
||||
[
|
||||
[1, 2, 3, 4, 5],
|
||||
[10, 9, 8, 7, 6],
|
||||
[11, 12, 13, 14, 15],
|
||||
[20, 19, 18, 17, 16],
|
||||
]
|
||||
[[1, 2, 3, 4, 5], [10, 9, 8, 7, 6], [11, 12, 13, 14, 15], [20, 19, 18, 17, 16]]
|
||||
)
|
||||
|
||||
# Check that flip_even_rows returned the expected result
|
||||
|
Reference in New Issue
Block a user