widgets and group dymanics start

This commit is contained in:
Henrik Lemke
2025-11-09 11:27:13 +01:00
parent cc1dedcbca
commit f537827bfe
19 changed files with 1524 additions and 12 deletions
+22 -11
View File
@@ -519,11 +519,13 @@ class Daq(Assembly):
if o == "c":
raise Exception("User-requested cancelling!")
def count_run_number_up_and_attach_to_scan(self, scan, **kwargs):
def count_run_number_up_and_attach_to_scan(self, scan, pgroup=None, **kwargs):
"""
Increments the run number by one.
"""
runno = self.get_next_run_number(self.pgroup)
if pgroup is None:
pgroup = self.pgroup
runno = self.get_next_run_number(pgroup)
print(f"Run number incremented to {runno}")
scan.daq_run_number = runno
@@ -618,9 +620,12 @@ class Daq(Assembly):
print("WARNING: issue adding data to run table")
print(f"Runtable appending took: {time.time()-t_start_rt:.3f} s")
def copy_scan_info_to_raw(self, scan, **kwargs):
def copy_scan_info_to_raw(self, scan, pgroup=None, **kwargs):
t_start = time.time()
if pgroup is None:
pgroup = self.pgroup
if hasattr(scan, "daq_run_number"):
runno = scan.daq_run_number
else:
@@ -635,7 +640,6 @@ class Daq(Assembly):
si = scan.scan_info
# save temprary file and send then to raw
pgroup = self.pgroup
tmpdir = Path(f"/sf/bernina/data/{pgroup}/res/run_data/daq/run{runno:04d}/aux")
tmpdir.mkdir(exist_ok=True, parents=True)
try:
@@ -673,7 +677,9 @@ class Daq(Assembly):
# f"--> creating and copying file took{time.time()-t_start} s, presently adding to deadtime."
# )
def append_status_to_scan_and_store(self, scan, append_status_info=True, **kwargs):
def append_status_to_scan_and_store(
self, scan, pgroup=None, append_status_info=True, **kwargs
):
if not append_status_info:
return
@@ -687,7 +693,8 @@ class Daq(Assembly):
else:
runno = self.get_last_run_number()
pgroup = self.pgroup
if pgroup is None:
pgroup = self.pgroup
tmpdir = Path(f"/sf/bernina/data/{pgroup}/res/run_data/daq/run{runno:04d}/aux")
tmpdir.mkdir(exist_ok=True, parents=True)
try:
@@ -751,14 +758,15 @@ class Daq(Assembly):
if not self.checker.stop_and_analyze():
scan._current_step_ok = False
def copy_aliases_to_scan(self, scan, send_aliases_now=False, **kwargs):
def copy_aliases_to_scan(self, scan, send_aliases_now=False, pgroup=None, **kwargs):
if send_aliases_now or (len(scan.values_done()) == 1):
namespace_aliases = self.namespace.alias.get_all()
if hasattr(scan, "daq_run_number"):
runno = scan.daq_run_number
else:
runno = self.daq.get_last_run_number()
pgroup = self.pgroup
if pgroup is None:
pgroup = self.pgroup
tmpdir = Path(
f"/sf/bernina/data/{pgroup}/res/run_data/daq/run{runno:04d}/aux"
)
@@ -881,7 +889,7 @@ class Daq(Assembly):
print(f"Could not add daq.pulse_id monitor")
traceback.print_exc()
def end_scan_monitors(self, scan, **kwargs):
def end_scan_monitors(self, scan, pgroup=None, **kwargs):
for tmon in scan.daq_monitors:
scan.daq_monitors[tmon].stop_callback()
@@ -895,7 +903,10 @@ class Daq(Assembly):
else:
runno = self.get_last_run_number()
tmpdir = Path(f"/sf/bernina/data/{self.pgroup}/res/run_data/daq/run{runno}/aux")
if pgroup is None:
pgroup = self.pgroup
tmpdir = Path(f"/sf/bernina/data/{pgroup}/res/run_data/daq/run{runno}/aux")
tmpdir.mkdir(exist_ok=True, parents=True)
try:
tmpdir.chmod(0o775)
@@ -910,7 +921,7 @@ class Daq(Assembly):
f"Copying monitor file to run {runno} to the raw directory of {self.pgroup}."
)
response = self.append_aux(
scanmonitorfile.as_posix(), pgroup=self.pgroup, run_number=runno
scanmonitorfile.as_posix(), pgroup=pgroup, run_number=runno
)
print(
f"Status: {response.json()['status']} Message: {response.json()['message']}"
@@ -0,0 +1,9 @@
MIT License
Copyright (c) 2023 [Your Name]
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
1. The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
2. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,64 @@
# textual-status-editor
This project implements a textual user interface for displaying and managing the status of various items in an assembly. It provides a tabular view of current values and allows users to set target values for these items.
## Features
- Display current values of status items in a tabular format.
- Entry fields for setting target values of status items.
- Interactive user interface built with the Textual library.
## Project Structure
```
textual-status-editor
├── src
│ └── textual_status_editor
│ ├── __init__.py
│ ├── app.py
│ ├── main.py
│ ├── config.py
│ ├── adapters
│ │ └── assembly_adapter.py
│ ├── models
│ │ └── status_item.py
│ └── widgets
│ ├── __init__.py
│ └── status_table.py
├── tests
│ ├── test_status_table.py
│ └── test_assembly_adapter.py
├── pyproject.toml
├── requirements.txt
├── README.md
├── .gitignore
└── LICENSE
```
## Installation
1. Clone the repository:
```
git clone https://github.com/yourusername/textual-status-editor.git
cd textual-status-editor
```
2. Install the required dependencies:
```
pip install -r requirements.txt
```
## Usage
To run the application, execute the following command:
```
python -m textual_status_editor.main
```
## Contributing
Contributions are welcome! Please feel free to submit a pull request or open an issue for any suggestions or improvements.
## License
This project is licensed under the MIT License. See the LICENSE file for more details.
@@ -0,0 +1,23 @@
[tool.poetry]
name = "textual-status-editor"
version = "0.1.0"
description = "A textual tabular widget to display and edit status values."
authors = ["Your Name <youremail@example.com>"]
license = "MIT"
readme = "README.md"
homepage = "https://github.com/yourusername/textual-status-editor"
repository = "https://github.com/yourusername/textual-status-editor"
keywords = ["textual", "status", "editor", "widget"]
[tool.poetry.dependencies]
python = "^3.8"
textual = "^0.1.0" # Replace with the actual version you want to use
rich = "^10.0.0" # For rich text formatting in the terminal
[tool.poetry.dev-dependencies]
pytest = "^6.0"
pytest-cov = "^2.10"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
@@ -0,0 +1,5 @@
textual
rich
pytest
pytest-asyncio
textual-widget
@@ -0,0 +1 @@
# This file is intentionally left blank.
@@ -0,0 +1,61 @@
from eco.elements.protocols import Assembly
from textual.app import App
from textual.widgets import Table, Input
from textual.reactive import Reactive
from textual.containers import Container
class StatusItem:
def __init__(self, name, current_value):
self.name = name
self.current_value = current_value
def set_target_value(self, value):
# Placeholder for setting the target value
self.current_value = value
class AssemblyAdapter:
def __init__(self, assembly: Assembly):
self.assembly = assembly
def get_status_items(self):
status_items = []
for item in self.assembly.status_collection.get_list():
current_value = item.get_current_value() if hasattr(item, 'get_current_value') else None
status_items.append(StatusItem(item.alias.get_full_name(), current_value))
return status_items
class StatusTable(App):
def __init__(self, assembly_adapter: AssemblyAdapter):
super().__init__()
self.assembly_adapter = assembly_adapter
self.status_items = self.assembly_adapter.get_status_items()
async def on_mount(self):
self.table = Table()
self.table.add_column("Name")
self.table.add_column("Current Value")
self.table.add_column("Set Target Value")
for item in self.status_items:
input_field = Input(placeholder="Enter value")
input_field.on_submit(lambda value, item=item: self.set_target_value(item, value))
self.table.add_row(item.name, str(item.current_value), input_field)
await self.view.dock(self.table)
def set_target_value(self, item: StatusItem, value: str):
item.set_target_value(value)
self.refresh_table()
def refresh_table(self):
self.table.clear()
for item in self.status_items:
input_field = Input(placeholder="Enter value")
input_field.on_submit(lambda value, item=item: self.set_target_value(item, value))
self.table.add_row(item.name, str(item.current_value), input_field)
if __name__ == "__main__":
assembly = Assembly() # Replace with actual assembly initialization
adapter = AssemblyAdapter(assembly)
app = StatusTable(adapter)
app.run()
@@ -0,0 +1,45 @@
from textual.app import App
from textual.widgets import Static, Input, Table
from textual.reactive import Reactive
from textual import events
from .adapters.assembly_adapter import AssemblyAdapter
class StatusItem:
def __init__(self, name, current_value):
self.name = name
self.current_value = current_value
def set_target_value(self, value):
# Logic to set the target value
pass
class StatusTable(Table):
def __init__(self, items):
super().__init__()
self.items = items
self.add_column("Name", min_width=20)
self.add_column("Current Value", min_width=20)
self.add_column("Set Target Value", min_width=20)
for item in self.items:
self.add_row(item.name, str(item.current_value), Input(placeholder="Set value"))
async def on_input_changed(self, event: events.InputChanged):
# Logic to handle input changes
row_index = self.get_row_index(event.sender)
if row_index is not None:
item = self.items[row_index]
item.set_target_value(event.sender.value)
class StatusEditorApp(App):
def __init__(self):
super().__init__()
self.adapter = AssemblyAdapter()
self.status_items = self.adapter.get_status_items()
async def on_mount(self):
self.table = StatusTable(self.status_items)
await self.view.dock(self.table)
if __name__ == "__main__":
StatusEditorApp.run()
@@ -0,0 +1,10 @@
# Configuration settings for the textual status editor application
# Layout parameters
TABLE_WIDTH = 80
TABLE_HEIGHT = 20
ENTRY_WIDTH = 10
# Constants
APP_TITLE = "Textual Status Editor"
STATUS_COLLECTION_NAME = "status_collection"
@@ -0,0 +1,52 @@
from textual.app import App
from textual.widgets import Static, Input, Table
from textual.reactive import Reactive
from textual import events
from .adapters.assembly_adapter import AssemblyAdapter
class StatusItem:
def __init__(self, name, current_value):
self.name = name
self.current_value = current_value
def set_target_value(self, value):
# Logic to set the target value
self.current_value = value
class StatusTable(Table):
def __init__(self, items):
super().__init__()
self.items = items
self.add_column("Name", min_width=20)
self.add_column("Current Value", min_width=20)
self.add_column("Set Target Value", min_width=20)
for item in self.items:
self.add_row(item.name, str(item.current_value), "")
async def on_input_changed(self, event: events.InputChanged):
row_index = event.row_index
column_index = event.column_index
if column_index == 2: # Assuming the third column is for setting target values
target_value = event.value
self.items[row_index].set_target_value(target_value)
self.update_row(row_index)
def update_row(self, row_index):
item = self.items[row_index]
self.update_row(row_index, item.name, str(item.current_value), "")
class StatusEditorApp(App):
def __init__(self):
super().__init__()
self.adapter = AssemblyAdapter()
self.status_items = self.adapter.get_status_items()
self.table = StatusTable(self.status_items)
async def on_mount(self):
await self.view.dock(self.table)
if __name__ == "__main__":
app = StatusEditorApp()
app.run()
@@ -0,0 +1,60 @@
from textual import events
from textual.widget import Widget
from textual.reactive import Reactive
from textual.containers import Container
from textual.widgets import Input, Table
class StatusItem:
def __init__(self, name, current_value):
self.name = name
self.current_value = current_value
def set_target_value(self, value):
# Logic to set the target value
self.current_value = value
class StatusItemWidget(Widget):
name: str
current_value: Reactive[str] = Reactive("")
def __init__(self, status_item: StatusItem):
super().__init__()
self.status_item = status_item
self.name = status_item.name
self.current_value = str(status_item.current_value)
def render(self):
return f"{self.name}: {self.current_value}"
async def on_input_changed(self, event: events.InputChanged):
if event.input.value:
self.status_item.set_target_value(event.input.value)
self.current_value = event.input.value
await self.refresh()
class StatusItemTable(Widget):
def __init__(self, status_items):
super().__init__()
self.status_items = status_items
self.table = Table()
def render(self):
self.table.clear()
self.table.add_column("Item Name")
self.table.add_column("Current Value")
self.table.add_column("Set Value")
for item in self.status_items:
row = [item.name, str(item.current_value), Input(placeholder="Set value")]
self.table.add_row(*row)
return Container(self.table)
async def on_input_changed(self, event: events.InputChanged):
for item in self.status_items:
if event.input.value:
item.set_target_value(event.input.value)
await self.refresh()
@@ -0,0 +1 @@
# This file is intentionally left blank.
@@ -0,0 +1,50 @@
from textual.app import App
from textual.widgets import Table, Input
from textual.reactive import Reactive
from textual.containers import Container
from eco.elements.protocols import Detector # Assuming Detector is imported from the correct module
from .assembly_adapter import AssemblyAdapter # Import the AssemblyAdapter
class StatusItem:
def __init__(self, name, current_value):
self.name = name
self.current_value = current_value
def set_target_value(self, value):
# Logic to set the target value
pass
class StatusTable(App):
status_items: Reactive[list[StatusItem]] = Reactive([])
def __init__(self, assembly_adapter: AssemblyAdapter):
super().__init__()
self.assembly_adapter = assembly_adapter
async def on_mount(self):
self.status_items = await self.assembly_adapter.get_status_items()
self.render_table()
def render_table(self):
table = Table(title="Status Table")
table.add_column("Name", justify="left")
table.add_column("Current Value", justify="right")
table.add_column("Set Target Value", justify="right")
for item in self.status_items:
input_field = Input(placeholder="Enter value", on_submit=self.set_value(item))
table.add_row(item.name, str(item.current_value), input_field)
self.set_widget(table)
async def set_value(self, item: StatusItem, value: str):
item.set_target_value(value)
await self.assembly_adapter.update_status_item(item)
def set_widget(self, widget):
container = Container(widget)
self.set_root(container)
if __name__ == "__main__":
assembly_adapter = AssemblyAdapter() # Initialize your adapter here
StatusTable(assembly_adapter).run()
@@ -0,0 +1,56 @@
import pytest
from textual_status_editor.adapters.assembly_adapter import AssemblyAdapter
from textual_status_editor.models.status_item import StatusItem
@pytest.fixture
def assembly_adapter():
return AssemblyAdapter()
def test_get_current_values(assembly_adapter):
# Mock the status collection to return predefined values
assembly_adapter.status_collection = [
StatusItem(name="Item1", current_value=10),
StatusItem(name="Item2", current_value=20),
]
current_values = assembly_adapter.get_current_values()
assert current_values == {
"Item1": 10,
"Item2": 20,
}
def test_set_target_value(assembly_adapter):
# Mock the status item
item = StatusItem(name="Item1", current_value=10)
assembly_adapter.status_collection = [item]
assembly_adapter.set_target_value("Item1", 15)
assert item.target_value == 15
def test_set_target_value_nonexistent_item(assembly_adapter):
# Mock the status collection
assembly_adapter.status_collection = [
StatusItem(name="Item1", current_value=10),
]
result = assembly_adapter.set_target_value("NonexistentItem", 15)
assert result is False # Expecting failure when item does not exist
def test_update_status_item(assembly_adapter):
item = StatusItem(name="Item1", current_value=10)
assembly_adapter.status_collection = [item]
assembly_adapter.update_status_item("Item1", 20)
assert item.current_value == 20
def test_update_status_item_nonexistent(assembly_adapter):
item = StatusItem(name="Item1", current_value=10)
assembly_adapter.status_collection = [item]
result = assembly_adapter.update_status_item("NonexistentItem", 20)
assert result is False # Expecting failure when item does not exist
@@ -0,0 +1,36 @@
import pytest
from textual_status_editor.widgets.status_table import StatusTable
from textual_status_editor.models.status_item import StatusItem
@pytest.fixture
def status_items():
return [
StatusItem(name="Item 1", current_value=10),
StatusItem(name="Item 2", current_value=20),
StatusItem(name="Item 3", current_value=30),
]
def test_status_table_display(status_items):
table = StatusTable(status_items)
rendered = table.render()
assert "Item 1" in rendered
assert "10" in rendered
assert "Item 2" in rendered
assert "20" in rendered
assert "Item 3" in rendered
assert "30" in rendered
def test_status_table_set_target_value(status_items):
table = StatusTable(status_items)
table.set_target_value("Item 1", 15)
assert status_items[0].current_value == 15
def test_status_table_invalid_target_value(status_items):
table = StatusTable(status_items)
table.set_target_value("Item 4", 25) # Non-existent item
assert status_items[0].current_value == 10 # Should remain unchanged
assert status_items[1].current_value == 20
assert status_items[2].current_value == 30
+2 -1
View File
@@ -37,7 +37,7 @@ def getDefaultElogInstance(
print(f"Found more than one elog for user group {pgroup}")
for lb in lbs:
creater = lb.createdBy
if creater == 'scilog-admin@psi.ch':
if creater == "scilog-admin@psi.ch":
log.select_logbook(lb)
print(f"Choosing default logbook created by 'scilog-admin@psi.ch'")
else:
@@ -78,6 +78,7 @@ class Elog(Assembly):
self,
*args,
tags=[],
pgroups=None,
text_encoding="markdown",
markdown_extensions=["fenced_code"],
**kwargs,
+324
View File
@@ -0,0 +1,324 @@
# ...existing code...
"""
Jupyter widget to view display items and (where supported) set targets.
Usage:
from eco.widgets.display_widget import make_assembly_widget
w = make_assembly_widget(my_assembly, poll_interval=1.0)
display(w)
Returned widget has methods:
w.start() # start background polling (already started by default)
w.stop() # stop background polling
"""
import threading
import time
from typing import Any, List
import ipywidgets as widgets
from IPython.display import display
# Try to import types for isinstance checks if available.
try:
from eco import Adjustable, Detector
except Exception:
Adjustable = object
Detector = object
def _make_input_widget_for_value(value: Any):
"""Return a suitable ipywidget for editing a value, plus a function to read it."""
if isinstance(value, bool):
w = widgets.Checkbox(value=value)
reader = lambda: w.value
elif isinstance(value, (int,)) and not isinstance(value, bool):
w = widgets.IntText(value=value)
reader = lambda: int(w.value)
elif isinstance(value, (float,)):
w = widgets.FloatText(value=value)
reader = lambda: float(w.value)
else:
# fallback to text field (strings, enums represented as strings)
w = widgets.Text(value=str(value) if value is not None else "")
reader = lambda: w.value
return w, reader
def _make_step_widget_for_value(value: Any):
"""Create step-size input suitable for numeric types."""
if isinstance(value, int) and not isinstance(value, bool):
step_w = widgets.IntText(value=1, layout=widgets.Layout(width="80px"))
reader = lambda: int(step_w.value)
else:
step_w = widgets.FloatText(
value=0.1 if isinstance(value, float) else 1.0,
layout=widgets.Layout(width="80px"),
)
reader = lambda: float(step_w.value)
return step_w, reader
def make_assembly_widget(assembly, poll_interval: float = 1.0, auto_start: bool = True):
"""
Build an ipywidgets VBox showing items in assembly.display_collection.
For items that are eco.Adjustable a tweak control with up/down buttons and step size is shown.
For items that are eco.Detector (and not Adjustable) no control is added.
For other items, a readonly display is shown.
Returns a VBox widget; the returned widget has .start() and .stop() methods
to control the background polling thread.
"""
rows: List[widgets.HBox] = []
item_entries = [] # list of dicts with item -> widgets and reader
# obtain list of display items (support either call or attribute)
try:
display_items = assembly.display_collection()
except Exception:
try:
display_items = assembly.status_collection.get_list(selection="display")
except Exception:
display_items = []
header = widgets.HBox(
[
widgets.HTML(value="<b>name</b>", layout=widgets.Layout(width="30%")),
widgets.HTML(value="<b>current</b>", layout=widgets.Layout(width="40%")),
widgets.HTML(value="<b>control</b>", layout=widgets.Layout(width="30%")),
]
)
for item in display_items:
name = (
item.alias.get_full_name(base=assembly)
if hasattr(item, "alias")
else getattr(item, "name", str(item))
)
try:
cur = item.get_current_value()
except Exception:
cur = "<error>"
name_w = widgets.Label(str(name), layout=widgets.Layout(width="30%"))
value_w = widgets.Label(str(cur), layout=widgets.Layout(width="40%"))
# control area
control_box = widgets.HBox(layout=widgets.Layout(width="30%"))
input_widget = None
reader = None
# If it's a Detector and NOT Adjustable -> no control widget (readonly)
if isinstance(item, Detector) and not isinstance(item, Adjustable):
control_box.children = (widgets.Label("read-only (Detector)"),)
# If it's Adjustable -> show tweak widget (step, up, down)
elif isinstance(item, Adjustable):
# create step widget based on current value
step_w, step_reader = _make_step_widget_for_value(cur)
up_btn = widgets.Button(
description="", layout=widgets.Layout(width="40px")
)
down_btn = widgets.Button(
description="", layout=widgets.Layout(width="40px")
)
# optional direct input to set an absolute value
if not isinstance(cur, (list, dict)) and not isinstance(
cur, (bytes, bytearray)
):
input_widget, reader = _make_input_widget_for_value(cur)
input_widget.layout.margin = "0 6px 0 0"
else:
input_widget = widgets.Label("n/a", layout=widgets.Layout(width="80px"))
def make_tweak_handlers(
it, val_widget, inp_reader, step_reader, up_b, down_b
):
def _do_set(newval, btn=None):
try:
r = it.set_target_value(newval)
try:
if hasattr(r, "wait"):
r.wait(timeout=5)
except Exception:
pass
try:
val_widget.value = str(it.get_current_value())
except Exception:
pass
if btn:
btn.description = (
btn.description
) # no-op to keep UI consistent
except Exception:
if btn:
old = btn.description
btn.description = "Err"
def _reset(b=btn, o=old):
time.sleep(1.2)
b.description = o
threading.Thread(target=_reset, daemon=True).start()
def _on_up(b=None):
try:
step = step_reader()
if inp_reader:
base = inp_reader()
else:
base = it.get_current_value()
newv = base + step
_do_set(newv, up_b)
# sync input widget if present
if inp_reader:
try:
input_widget.value = str(newv)
except Exception:
pass
except Exception:
_do_set(None, up_b) # triggers error visual
def _on_down(b=None):
try:
step = step_reader()
if inp_reader:
base = inp_reader()
else:
base = it.get_current_value()
newv = base - step
_do_set(newv, down_b)
if inp_reader:
try:
input_widget.value = str(newv)
except Exception:
pass
except Exception:
_do_set(None, down_b)
def _on_set_direct(b=None):
if not inp_reader:
return
try:
val = inp_reader()
_do_set(val, None)
# update shown value
try:
val_widget.value = str(it.get_current_value())
except Exception:
pass
except Exception:
pass
return _on_up, _on_down, _on_set_direct
on_up, on_down, on_set_direct = make_tweak_handlers(
item, value_w, reader, step_reader, up_btn, down_btn
)
up_btn.on_click(on_up)
down_btn.on_click(on_down)
set_btn = widgets.Button(
description="Set",
button_style="primary",
layout=widgets.Layout(width="60px"),
)
set_btn.on_click(on_set_direct)
control_box.children = (step_w, up_btn, down_btn, input_widget, set_btn)
# Fallback: if item has set_target_value (callable) but wasn't captured above, allow simple set
elif hasattr(item, "set_target_value") and callable(
getattr(item, "set_target_value")
):
# create input widget based on current value
input_widget, reader = _make_input_widget_for_value(cur)
input_widget.layout.margin = "0 6px 0 0"
set_button = widgets.Button(
description="Set",
button_style="primary",
layout=widgets.Layout(width="60px"),
)
def make_on_set(it, rw, vw, btn):
def _on_set(b):
try:
val = rw()
r = it.set_target_value(val)
try:
if hasattr(r, "wait"):
r.wait(timeout=5)
except Exception:
pass
try:
vw.value = str(it.get_current_value())
except Exception:
pass
btn.description = "Set"
except Exception:
btn.description = "Err"
def _reset():
time.sleep(1.2)
btn.description = "Set"
threading.Thread(target=_reset, daemon=True).start()
return _on_set
set_button.on_click(make_on_set(item, reader, value_w, set_button))
control_box.children = (input_widget, set_button)
else:
control_box.children = (
widgets.Label("", layout=widgets.Layout(margin="0 0 0 6px")),
)
row = widgets.HBox([name_w, value_w, control_box])
rows.append(row)
item_entries.append(
{
"item": item,
"value_widget": value_w,
"input_widget": input_widget,
"reader": reader,
}
)
vbox = widgets.VBox([header] + rows)
# background updater
stop_event = threading.Event()
updater_thread = None
def _update_loop():
while not stop_event.wait(poll_interval):
for ent in item_entries:
it = ent["item"]
vw = ent["value_widget"]
try:
val = it.get_current_value()
vw.value = str(val)
except Exception:
pass
def start():
nonlocal updater_thread
if updater_thread and updater_thread.is_alive():
return
stop_event.clear()
updater_thread = threading.Thread(target=_update_loop, daemon=True)
updater_thread.start()
def stop():
stop_event.set()
vbox.start = start
vbox.stop = stop
vbox._stop_event = stop_event
if auto_start:
start()
return vbox
# ...existing code...
+275
View File
@@ -0,0 +1,275 @@
"""
IPython widget to select one item from a Namespace.required_names and
to choose one item to be selected in 12 hours (separate radio group).
Usage:
from eco.widgets.namespace_selector import make_namespace_selector
w = make_namespace_selector(ns)
display(w) # w is a widgets.VBox subclass
cur = w.get_current()
sched = w.get_scheduled_12h()
w.on_change(lambda cur, sched: print("changed", cur, sched))
"""
# ...existing code...
from typing import Any, Callable, Iterable, List, Optional, Tuple
import ipywidgets as widgets
def _label_of(item: Any) -> str:
# prefer alias/full name, then name, then str()
try:
if hasattr(item, "alias") and hasattr(item.alias, "get_full_name"):
return item.alias.get_full_name()
except Exception:
pass
try:
if hasattr(item, "name"):
return str(item.name)
except Exception:
pass
return str(item)
class NamespaceSelector(widgets.VBox):
def __init__(
self,
namespace: Any,
required_names_attr: str = "required_names",
initial: Optional[Any] = None,
scheduled_initial: Optional[Any] = None,
none_label: str = "None",
):
"""
namespace: object containing an iterable attribute required_names_attr
(list of items or names). Each entry can be any object.
initial: optional item from the required_names to pre-select (value compared by identity).
scheduled_initial: optional item to pre-select for 12-hour selection (or None).
"""
# collect items
items = getattr(namespace, required_names_attr, None)
if items is None:
items = []
# normalize to list
items = list(items)
# build options as (label, value) pairs
self._options: List[Tuple[str, Any]] = [( _label_of(it), it ) for it in items]
# radio for "current selection"
rb_options = [(lbl, val) for lbl, val in self._options]
self.current_rb = widgets.RadioButtons(
options=rb_options,
value=(initial if initial is not None else (rb_options[0][1] if rb_options else None)),
description="Select",
layout=widgets.Layout(width="100%"),
)
# radio for "selected in 12 hours" — include explicit None option
rb12_options = [(none_label, None)] + [(lbl, val) for lbl, val in self._options]
self.scheduled_rb = widgets.RadioButtons(
options=rb12_options,
value=(scheduled_initial if scheduled_initial is not None else None),
description="In 12h",
layout=widgets.Layout(width="100%"),
)
super().__init__([widgets.HTML(value="<b>Required items</b>"), self.current_rb,
widgets.HTML(value="<b>Mark component to select in 12 hours</b>"), self.scheduled_rb])
# callbacks list called with (current_value, scheduled_value)
self._cbs: List[Callable[[Any, Any], None]] = []
# attach observers
self.current_rb.observe(self._on_change, names="value")
self.scheduled_rb.observe(self._on_change, names="value")
def _on_change(self, change):
cur = self.get_current()
sched = self.get_scheduled_12h()
for cb in list(self._cbs):
try:
cb(cur, sched)
except Exception:
# swallow callback errors to keep widget responsive
pass
def get_current(self) -> Optional[Any]:
"""Return the currently selected item (or None)."""
return self.current_rb.value
def get_scheduled_12h(self) -> Optional[Any]:
"""Return the item selected for 12 hours from now (or None)."""
return self.scheduled_rb.value
def set_current(self, item: Any) -> None:
"""Programmatically set current selection (item must be one of options or None)."""
# allow None if present in options (rare); otherwise ignore
values = [v for (_, v) in self.current_rb.options]
if item in values:
self.current_rb.value = item
def set_scheduled_12h(self, item: Optional[Any]) -> None:
"""Programmatically set 12h selection (item must be one of options or None)."""
values = [v for (_, v) in self.scheduled_rb.options]
if item in values:
self.scheduled_rb.value = item
def on_change(self, cb: Callable[[Any, Any], None]) -> None:
"""Register a callback called as cb(current, scheduled) on changes."""
if callable(cb):
self._cbs.append(cb)
def make_namespace_selector(
namespace: Any,
required_names_attr: str = "required_names",
initial: Optional[Any] = None,
scheduled_initial: Optional[Any] = None,
) -> NamespaceSelector:
"""Convenience factory."""
return NamespaceSelector(
namespace,
required_names_attr=required_names_attr,
initial=initial,
scheduled_initial=scheduled_initial,
)
# ...existing code...
```# filepath: /home/lemke_h/mypy/eco/eco/widgets/namespace_selector.py
"""
IPython widget to select one item from a Namespace.required_names and
to choose one item to be selected in 12 hours (separate radio group).
Usage:
from eco.widgets.namespace_selector import make_namespace_selector
w = make_namespace_selector(ns)
display(w) # w is a widgets.VBox subclass
cur = w.get_current()
sched = w.get_scheduled_12h()
w.on_change(lambda cur, sched: print("changed", cur, sched))
"""
# ...existing code...
from typing import Any, Callable, Iterable, List, Optional, Tuple
import ipywidgets as widgets
def _label_of(item: Any) -> str:
# prefer alias/full name, then name, then str()
try:
if hasattr(item, "alias") and hasattr(item.alias, "get_full_name"):
return item.alias.get_full_name()
except Exception:
pass
try:
if hasattr(item, "name"):
return str(item.name)
except Exception:
pass
return str(item)
class NamespaceSelector(widgets.VBox):
def __init__(
self,
namespace: Any,
required_names_attr: str = "required_names",
initial: Optional[Any] = None,
scheduled_initial: Optional[Any] = None,
none_label: str = "None",
):
"""
namespace: object containing an iterable attribute required_names_attr
(list of items or names). Each entry can be any object.
initial: optional item from the required_names to pre-select (value compared by identity).
scheduled_initial: optional item to pre-select for 12-hour selection (or None).
"""
# collect items
items = getattr(namespace, required_names_attr, None)
if items is None:
items = []
# normalize to list
items = list(items)
# build options as (label, value) pairs
self._options: List[Tuple[str, Any]] = [( _label_of(it), it ) for it in items]
# radio for "current selection"
rb_options = [(lbl, val) for lbl, val in self._options]
self.current_rb = widgets.RadioButtons(
options=rb_options,
value=(initial if initial is not None else (rb_options[0][1] if rb_options else None)),
description="Select",
layout=widgets.Layout(width="100%"),
)
# radio for "selected in 12 hours" — include explicit None option
rb12_options = [(none_label, None)] + [(lbl, val) for lbl, val in self._options]
self.scheduled_rb = widgets.RadioButtons(
options=rb12_options,
value=(scheduled_initial if scheduled_initial is not None else None),
description="In 12h",
layout=widgets.Layout(width="100%"),
)
super().__init__([widgets.HTML(value="<b>Required items</b>"), self.current_rb,
widgets.HTML(value="<b>Mark component to select in 12 hours</b>"), self.scheduled_rb])
# callbacks list called with (current_value, scheduled_value)
self._cbs: List[Callable[[Any, Any], None]] = []
# attach observers
self.current_rb.observe(self._on_change, names="value")
self.scheduled_rb.observe(self._on_change, names="value")
def _on_change(self, change):
cur = self.get_current()
sched = self.get_scheduled_12h()
for cb in list(self._cbs):
try:
cb(cur, sched)
except Exception:
# swallow callback errors to keep widget responsive
pass
def get_current(self) -> Optional[Any]:
"""Return the currently selected item (or None)."""
return self.current_rb.value
def get_scheduled_12h(self) -> Optional[Any]:
"""Return the item selected for 12 hours from now (or None)."""
return self.scheduled_rb.value
def set_current(self, item: Any) -> None:
"""Programmatically set current selection (item must be one of options or None)."""
# allow None if present in options (rare); otherwise ignore
values = [v for (_, v) in self.current_rb.options]
if item in values:
self.current_rb.value = item
def set_scheduled_12h(self, item: Optional[Any]) -> None:
"""Programmatically set 12h selection (item must be one of options or None)."""
values = [v for (_, v) in self.scheduled_rb.options]
if item in values:
self.scheduled_rb.value = item
def on_change(self, cb: Callable[[Any, Any], None]) -> None:
"""Register a callback called as cb(current, scheduled) on changes."""
if callable(cb):
self._cbs.append(cb)
def make_namespace_selector(
namespace: Any,
required_names_attr: str = "required_names",
initial: Optional[Any] = None,
scheduled_initial: Optional[Any] = None,
) -> NamespaceSelector:
"""Convenience factory."""
return NamespaceSelector(
namespace,
required_names_attr=required_names_attr,
initial=initial,
scheduled_initial=scheduled_initial,
)
# ...existing code...
+428
View File
@@ -0,0 +1,428 @@
"""
IPython widget for Scans instances.
- Choose scan method from a dropdown (ascan, dscan, meshscan, acquire, etc).
- Selecting a method builds a parameter form for positional and keyword args
(inspecting the method signature) and includes dynamic callback keywords
returned by scans.get_callback_keywords(method_name) when available.
- Second tab contains a matplotlib Figure with an empty axis. The widget exposes
`.fig` and `.ax` for plotting.
- Pressing "Run" will call a user-provided run_callback(scan_obj, method_name, args, kwargs)
if given, otherwise it will attempt to call the scan method directly.
Usage:
from eco.widgets.scan_widget import ScanWidget, make_scan_widget
w = make_scan_widget(scans_instance)
display(w)
# Access figure: w.fig, w.ax
# Register custom runner:
w.run_callback = lambda scans, m, a, k: print("would run", m, a, k)
"""
from typing import Any, Callable, Dict, List, Optional, Tuple
import inspect
import threading
import json
import ipywidgets as widgets
from IPython.display import display, clear_output
import matplotlib.pyplot as plt
# helper to coerce simple string to numeric/bool if possible
def _coerce_value(s: str) -> Any:
if s is None:
return None
s = s.strip()
if s == "":
return ""
# try bool
if s.lower() in ("true", "false"):
return s.lower() == "true"
# try int
try:
iv = int(s)
return iv
except Exception:
pass
# try float
try:
fv = float(s)
return fv
except Exception:
pass
# try json (list/dict)
try:
j = json.loads(s)
return j
except Exception:
pass
return s
def _make_widget_for_default(value: Any):
"""Return (widget, reader) for a default value."""
# None -> text input (empty)
if isinstance(value, bool):
w = widgets.Checkbox(value=value)
return w, lambda: w.value
if isinstance(value, int) and not isinstance(value, bool):
w = widgets.IntText(value=value)
return w, lambda: int(w.value)
if isinstance(value, float):
w = widgets.FloatText(value=value)
return w, lambda: float(w.value)
# list/tuple -> Text (JSON) so user can enter JSON-like
if isinstance(value, (list, dict, tuple)):
w = widgets.Text(value=json.dumps(value), layout=widgets.Layout(width="100%"))
return w, lambda: _coerce_value(w.value)
# fallback string
w = widgets.Text(
value=str(value) if value is not None else "",
layout=widgets.Layout(width="100%"),
)
return w, lambda: _coerce_value(w.value)
def _make_free_arg_widget(placeholder: str = ""):
w = widgets.Text(
value="", placeholder=placeholder, layout=widgets.Layout(width="100%")
)
return w, lambda: _coerce_value(w.value)
class ScanWidget(widgets.Tab):
def __init__(
self,
scans_obj: Any,
methods: Optional[List[str]] = None,
auto_build: bool = True,
):
"""
scans_obj: instance providing scan methods and optionally get_callback_keywords(method_name).
methods: optional list of method names to offer; if None common names will be searched on scans_obj.
"""
self.scans = scans_obj
# find available methods if not provided
if methods is None:
cand = ["ascan", "dscan", "meshscan", "acquire"]
methods = [m for m in cand if hasattr(scans_obj, m)]
# also include any callable attributes that look like scans
for name in dir(scans_obj):
if (
name not in methods
and callable(getattr(scans_obj, name))
and not name.startswith("_")
):
methods.append(name)
self.methods = methods
# top controls: dropdown, run button, get params button
self.method_dd = widgets.Dropdown(
options=self.methods,
description="Method:",
layout=widgets.Layout(width="50%"),
)
self.run_button = widgets.Button(description="Run", button_style="primary")
self.get_params_button = widgets.Button(description="Get params")
self.status_label = widgets.Label("")
top_box = widgets.HBox(
[self.method_dd, self.run_button, self.get_params_button, self.status_label]
)
# parameter area will be rebuilt per method
self.params_box = widgets.VBox([])
# expose run callback override
# signature: run_callback(scans_obj, method_name, args_list, kwargs_dict)
self.run_callback: Optional[
Callable[[Any, str, List[Any], Dict[str, Any]], Any]
] = None
# figure tab: create empty figure and axis, display into an Output widget
self.fig = plt.Figure(figsize=(6, 4))
self.ax = self.fig.add_subplot(111)
self.plot_out = widgets.Output(layout=widgets.Layout(border="1px solid #ddd"))
with self.plot_out:
display(self.fig)
# assemble two tab children: form and plot output
self.form_vbox = widgets.VBox(
[top_box, widgets.HTML("<b>Parameters</b>"), self.params_box]
)
children = [self.form_vbox, self.plot_out]
super().__init__(children)
self.set_title(0, "Form")
self.set_title(1, "Plot")
# wire events
self.method_dd.observe(self._on_method_change, names="value")
self.run_button.on_click(self._on_run)
self.get_params_button.on_click(self._on_get_params)
# storage for widgets mapping
self._pos_widgets: List[Tuple[str, widgets.Widget, Callable[[], Any]]] = []
self._kw_widgets: List[Tuple[str, widgets.Widget, Callable[[], Any]]] = []
if auto_build:
self.build_for_method(self.method_dd.value)
def _on_method_change(self, change):
if change.get("name") == "value":
self.build_for_method(change["new"])
def _get_dynamic_callback_keywords(self, method_name: str) -> Dict[str, Any]:
"""Call scans.get_callback_keywords(method_name) if available, return dict of kw->default/metadata."""
fn = getattr(self.scans, "get_callback_keywords", None)
if callable(fn):
try:
kws = fn(method_name)
if isinstance(kws, dict):
return kws
# try list of names -> treat as None defaults
if isinstance(kws, (list, tuple)):
return {k: None for k in kws}
except Exception:
pass
return {}
def build_for_method(self, method_name: str):
"""(Re)build parameter widgets for selected method."""
self._pos_widgets = []
self._kw_widgets = []
self.status_label.value = ""
self.params_box.children = [
widgets.Label(f"Building parameter form for {method_name}...")
]
method = getattr(self.scans, method_name, None)
if method is None or not callable(method):
self.params_box.children = [widgets.Label("Selected method not available")]
return
sig = None
try:
sig = inspect.signature(method)
except Exception:
sig = None
# dynamic callback keywords
dyn_kws = self._get_dynamic_callback_keywords(method_name)
pos_rows = []
kw_rows = []
if sig is not None:
for pname, param in sig.parameters.items():
if pname == "self":
continue
kind = param.kind
default = param.default if param.default is not inspect._empty else None
if kind in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
):
# positional parameter: create widget; if default is None treat as required
if default is None:
w, reader = _make_free_arg_widget(
placeholder=f"{pname} (required)"
)
else:
w, reader = _make_widget_for_default(default)
lbl = widgets.Label(pname, layout=widgets.Layout(width="25%"))
row = widgets.HBox([lbl, w])
pos_rows.append(row)
self._pos_widgets.append((pname, w, reader))
elif kind == inspect.Parameter.VAR_POSITIONAL:
# allow multiple positional args as newline-separated or JSON list
w = widgets.Textarea(
placeholder="comma separated or JSON list",
layout=widgets.Layout(width="100%"),
)
reader = lambda w=w: _coerce_value(w.value)
lbl = widgets.Label("*" + pname, layout=widgets.Layout(width="25%"))
row = widgets.HBox([lbl, w])
pos_rows.append(row)
self._pos_widgets.append((pname, w, reader))
elif kind == inspect.Parameter.KEYWORD_ONLY:
# keyword-only parameter
if default is None:
w, reader = _make_free_arg_widget(
placeholder=f"{pname} (required)"
)
else:
w, reader = _make_widget_for_default(default)
lbl = widgets.Label(pname, layout=widgets.Layout(width="25%"))
row = widgets.HBox([lbl, w])
kw_rows.append(row)
self._kw_widgets.append((pname, w, reader))
elif kind == inspect.Parameter.VAR_KEYWORD:
# provide a Textarea for free-form kwargs (JSON or key=val lines)
w = widgets.Textarea(
placeholder='JSON object or "k=v" lines',
layout=widgets.Layout(width="100%"),
)
reader = lambda w=w: _coerce_value(w.value)
lbl = widgets.Label(
"**" + pname, layout=widgets.Layout(width="25%")
)
row = widgets.HBox([lbl, w])
kw_rows.append(row)
self._kw_widgets.append((pname, w, reader))
else:
# unknown signature: provide free args and kwargs boxes
wpos = widgets.Textarea(
placeholder="JSON list of positional args",
layout=widgets.Layout(width="100%"),
)
rr_pos = lambda w=wpos: _coerce_value(w.value)
self._pos_widgets.append(("args", wpos, rr_pos))
wkw = widgets.Textarea(
placeholder="JSON kwargs dict", layout=widgets.Layout(width="100%")
)
rr_kw = lambda w=wkw: _coerce_value(w.value)
self._kw_widgets.append(("kwargs", wkw, rr_kw))
pos_rows.append(wpos)
kw_rows.append(wkw)
# include dynamic callback keywords (if any) as additional kwargs (do not overwrite existing)
for k, v in dyn_kws.items():
if k in [name for name, _, _ in self._kw_widgets]:
continue
# create widget depending on provided default
if isinstance(v, (list, tuple)):
# treat as choices -> Dropdown
options = [(str(opt), opt) for opt in v]
dd = widgets.Dropdown(
options=options,
value=(v[0] if len(v) else None),
layout=widgets.Layout(width="60%"),
)
reader = lambda dd=dd: dd.value
lbl = widgets.Label(k, layout=widgets.Layout(width="25%"))
row = widgets.HBox([lbl, dd])
kw_rows.append(row)
self._kw_widgets.append((k, dd, reader))
else:
if v is None:
w, reader = _make_free_arg_widget(placeholder=f"{k} (optional)")
else:
w, reader = _make_widget_for_default(v)
lbl = widgets.Label(k, layout=widgets.Layout(width="25%"))
row = widgets.HBox([lbl, w])
kw_rows.append(row)
self._kw_widgets.append((k, w, reader))
pos_section = (
widgets.VBox([widgets.HTML("<b>Positional / varargs</b>")] + pos_rows)
if pos_rows
else widgets.HTML("")
)
kw_section = (
widgets.VBox([widgets.HTML("<b>Keyword args</b>")] + kw_rows)
if kw_rows
else widgets.HTML("")
)
self.params_box.children = [pos_section, kw_section]
def _collect_params(self) -> Tuple[List[Any], Dict[str, Any]]:
"""Read widgets and return (args_list, kwargs_dict)."""
args: List[Any] = []
kwargs: Dict[str, Any] = {}
# positional widgets
for name, w, reader in self._pos_widgets:
val = None
try:
val = reader()
except Exception:
val = None
if name.startswith("*"):
# not used here, but include raw
args.append(val)
elif name == "args":
if isinstance(val, list):
args.extend(val)
elif isinstance(val, (str,)):
# try parse comma separated
if val.strip().startswith("[") or val.strip().startswith("{"):
try:
parsed = _coerce_value(val)
if isinstance(parsed, list):
args.extend(parsed)
else:
args.append(parsed)
except Exception:
args.append(val)
else:
parts = [p.strip() for p in val.split(",") if p.strip()]
for p in parts:
args.append(_coerce_value(p))
else:
args.append(val)
else:
# normal positional param: include value (even if None) but caller may require
args.append(val)
# keyword widgets
for name, w, reader in self._kw_widgets:
try:
val = reader()
except Exception:
val = None
if name == "kwargs" or name.startswith("**"):
# parse as dict if possible
if isinstance(val, dict):
kwargs.update(val)
elif isinstance(val, str):
# attempt parse JSON
parsed = _coerce_value(val)
if isinstance(parsed, dict):
kwargs.update(parsed)
else:
# try parse "k=v" lines
for line in val.splitlines():
if "=" in line:
k, v = line.split("=", 1)
kwargs[k.strip()] = _coerce_value(v)
elif isinstance(val, dict):
kwargs.update(val)
else:
# skip unknown
pass
else:
kwargs[name] = val
return args, kwargs
def _on_get_params(self, _=None):
a, k = self._collect_params()
self.status_label.value = f"Args: {a} Kw: {k}"
def _on_run(self, _=None):
method = self.method_dd.value
args, kwargs = self._collect_params()
self.status_label.value = "Running..."
# allow custom callback
def _do_call():
try:
if callable(self.run_callback):
res = self.run_callback(self.scans, method, args, kwargs)
else:
fn = getattr(self.scans, method, None)
if not callable(fn):
raise RuntimeError("method not callable")
res = fn(*args, **kwargs)
self.status_label.value = "Done"
except Exception as exc:
self.status_label.value = f"Error: {exc}"
# run in background thread to avoid blocking UI
t = threading.Thread(target=_do_call, daemon=True)
t.start()
def make_scan_widget(scans_obj: Any, methods: Optional[List[str]] = None) -> ScanWidget:
return ScanWidget(scans_obj, methods=methods)