Compare commits

...

11 Commits

Author SHA1 Message Date
perl_d f35d5964f9 config: add aerotech device to pxii config
CI for pxii_bec / test (pull_request) Successful in 32s
CI for pxii_bec / test (push) Successful in 36s
2026-05-07 16:32:15 +02:00
perl_d 6c8351238c feat: add aerotech device 2026-05-07 16:32:15 +02:00
perl_d b65ed70f32 refactor: extract core HTTP device logic 2026-05-07 16:32:15 +02:00
wyzula_j 434db75f6c fix(bec widgets): designer plugins fixed for widgets
CI for pxii_bec / test (pull_request) Successful in 9m37s
CI for pxii_bec / test (push) Successful in 1m51s
2026-05-04 15:27:27 +02:00
perl_d 1950ee5c85 perf: use reusable request session in smargon controller
CI for pxii_bec / test (pull_request) Successful in 33s
CI for pxii_bec / test (push) Successful in 35s
2026-04-20 11:46:01 +02:00
perl_d a11fe91998 tests: add test for smargon
CI for pxii_bec / test (pull_request) Successful in 35s
CI for pxii_bec / test (push) Successful in 33s
2026-03-06 16:23:52 +01:00
perl_d c58b0c1d20 feat: Smargopolo Smargon device v1 2026-03-06 15:49:48 +01:00
wyzula_j 1041e3a308 fix(bec-widgets): migration of scripts to V3
CI for pxii_bec / test (pull_request) Successful in 29s
CI for pxii_bec / test (push) Successful in 32s
2026-03-05 13:59:55 +01:00
perl_d 2912a306ab Update repo with template version v1.2.8
CI for pxii_bec / test (pull_request) Successful in 35s
CI for pxii_bec / test (push) Successful in 54s
2026-02-27 15:49:26 +01:00
perl_d 3c54d98ce5 Update repo with template version v1.2.7
CI for pxii_bec / test (push) Failing after 0s
CI for pxii_bec / test (pull_request) Failing after 0s
2026-02-27 12:11:40 +01:00
appleb_m feaa340670 Merge pull request 'renamed sample environment devices to positioned devices, updated yamls and mx macros.' (#8) from x10sa_production_20260202T095359 into main
CI for pxii_bec / test (push) Successful in 26s
Reviewed-on: #8
2026-02-03 14:03:17 +01:00
18 changed files with 683 additions and 55 deletions
+1 -1
View File
@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.2.2
_commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: pxii_bec
+14 -9
View File
@@ -28,7 +28,7 @@ on:
description: "Python version to use"
required: false
type: string
default: "3.11"
default: "3.12"
permissions:
pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/pxii_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./pxii_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core
uses: actions/checkout@v4
@@ -67,13 +79,6 @@ jobs:
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/pxii_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./pxii_bec
- name: Install dependencies
shell: bash
run: |
+62
View File
@@ -0,0 +1,62 @@
name: Create template upgrade PR for pxii_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"
-7
View File
@@ -1,7 +0,0 @@
include:
- file: /templates/plugin-repo-template.yml
inputs:
name: pxii_bec
target: pxii_bec
branch: $CHILD_PIPELINE_BRANCH
project: bec/awi_utils
+3 -1
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
logger = bec_logger.logger
@@ -18,6 +18,8 @@ _Widgets = {
class ScanHistory(RPCBase):
_IMPORT_MODULE = "pxii_bec.bec_widgets.widgets.scan_history.scan_history"
@rpc_call
def select_scan_from_history(self, value: "int") -> "None":
"""
@@ -0,0 +1,13 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"ScanHistory": ("pxii_bec.bec_widgets.widgets.scan_history.scan_history", "ScanHistory"),
}
widget_icons = {
"ScanHistory": "widgets",
}
@@ -42,4 +42,30 @@ dcm_froll:
deviceTags:
- dcm
readOnly: false
softwareTrigger: false
softwareTrigger: false
smargon:
description: REST-based device which connects to Smargopolo
deviceClass: pxii_bec.devices.smargopolo_smargon.Smargon
deviceConfig: {prefix: 'http://x10sa-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- smargon
- motors
readOnly: false
softwareTrigger: false
aerotech:
description: REST-based device which connects to AareScan
deviceClass: pxii_bec.devices.aerotech
deviceConfig: { prefix: "http://mx-x10sa-queue-01:5234/" }
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- aerotech
- motors
readOnly: false
softwareTrigger: false
+43
View File
@@ -0,0 +1,43 @@
from ophyd import Component as Cpt
from .http import TIMESTAMP_ID, HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
class AerotechController(HttpDeviceController):
_readback_endpoint = "status"
_target_endpoint = "position"
def __init__(self, *, prefix, **kwargs):
self._readbacks: dict[str, dict[str, float | bool]] = {}
super().__init__(prefix=prefix, **kwargs)
def put(self, axis: str, val: float):
self._rest_post(body={axis: val})
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
with self._readback_lock:
if axis_id not in self._readbacks or TIMESTAMP_ID not in self._readbacks:
return None
return self._readbacks.get(axis_id)["pos"], self._readbacks.get(TIMESTAMP_ID) # type: ignore
class Aerotech(HttpOphydDevice):
controller_class = AerotechController
x = Cpt(HttpDeviceSignal, axis_identifier="x", tolerance=0.01)
y = Cpt(HttpDeviceSignal, axis_identifier="y", tolerance=0.01)
z = Cpt(HttpDeviceSignal, axis_identifier="z", tolerance=0.01)
u = Cpt(HttpDeviceSignal, axis_identifier="u", tolerance=0.01)
vel_u_deg_s = Cpt(HttpDeviceSignal, axis_identifier="vel_u_deg_s", tolerance=0.01)
def _test():
a = Aerotech(name="aerotech", prefix="http://mx-x10sa-queue-01:5234")
a.wait_for_connection()
return a
if __name__ == "__main__":
aerotech = _test()
print(aerotech.read())
aerotech.stop()
+178
View File
@@ -0,0 +1,178 @@
import time
from abc import ABC, abstractmethod
from threading import Event, RLock, Thread
from typing import Any
from ophyd import OphydObject
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.socket import SocketSignal
from requests import Response, Session
TIMESTAMP_ID = "__timestamp"
_POLL_INTERVAL_SLOW = 0.1
class HttpRestError(Exception):
"""Error for rest calls from a HttpRestSignal."""
def __init__(self, resp: Response, *args: object, value: Any | None = None) -> None:
method, url = resp.request.method, resp.request.url
data = f"{str(value)} to " if value is not None else ""
super().__init__(
f"Could not {method} {data}{url}. Code: {resp.status_code}. Reason: {resp.reason}.",
*args,
)
class HttpDeviceController(OphydObject, ABC):
"""Controller to consolidate polling loops and other REST calls for devices which communicate
with HTTP REST interfaces"""
_readback_endpoint: str
_target_endpoint: str
def __init__(self, *, prefix, **kwargs):
self._readbacks: dict
self._session = Session()
self._prefix = prefix
self._targets = {}
self._signal_registry: set[str] = set()
self._readback_poll_interval: float = _POLL_INTERVAL_SLOW
super().__init__(**kwargs)
self._setup_readback()
def _setup_readback(self):
self._stop_monitor_readback_event = Event()
self._readback_lock = RLock()
self._monitor_readback_thread = Thread(
target=self._monitor,
args=[
self._readback_endpoint,
self._stop_monitor_readback_event,
self._readback_lock,
self._readbacks,
],
)
def manual_update(self):
self._update_reading(self._readback_endpoint, self._readback_lock, self._readbacks)
def _update_reading(self, endpoint: str, lock: RLock, buffer: dict):
data = self._rest_get(endpoint)
timestamp = time.monotonic()
with lock:
buffer.update(data)
buffer["__timestamp"] = timestamp
def _monitor(self, endpoint: str, event: Event, lock: RLock, buffer: dict):
while not event.is_set():
self._update_reading(endpoint, lock, buffer)
time.sleep(self._readback_poll_interval)
def _clean_monitor(self):
if self._monitor_readback_thread.is_alive():
self._stop_monitor_readback_event.set()
self._monitor_readback_thread.join(timeout=2)
if self._monitor_readback_thread.is_alive():
raise RuntimeError("Failed to clean up Aerotech monitor thread.")
def register(self, axis_id: str):
self._signal_registry.add(axis_id)
def _rest_get(self, endpoint):
resp = self._session.get(self._prefix + endpoint)
if not resp.ok:
raise HttpRestError(resp)
return resp.json()
def _rest_put(self, params: dict | None = None, body: dict | None = None):
resp = self._session.put(self._prefix + self._target_endpoint, params=params, json=body)
if not resp.ok:
raise HttpRestError(resp, value=params)
def _rest_post(self, params: dict | None = None, body: dict | None = None):
resp = self._session.post(self._prefix + self._target_endpoint, params=params, json=body)
if not resp.ok:
raise HttpRestError(resp, value=params)
def start_monitor(self):
"""Start or restart the automonitor thread."""
self._clean_monitor()
self._setup_readback()
self._monitor_readback_thread.start()
def monitor_stopped(self):
return not self._monitor_readback_thread.is_alive()
def put(self, axis: str, val: float):
self._rest_put({axis: val})
@abstractmethod
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
"""Return a tuple (reading, timestamp) if the axis_id exists"""
def stop(self):
# There doesn't appear to be a stop endpoint on the server
# Best effort: set the target to the current position
pass
# TODO: self._rest_put(self._readbacks)
class HttpDeviceSignal(SocketSignal):
"""Ophyd signal which gets and puts to a REST API rather than EPICS PVs, mediated through the Aerotech
Controller"""
def __init__(self, *args, axis_identifier: str, **kwargs):
super().__init__(*args, **kwargs)
controller: HttpDeviceController | None = getattr(self.root, "controller", None)
if controller is None:
raise TypeError("HttpDeviceSignal must be used in a device with a HttpDeviceController")
self._controller = controller
self._axis_id = axis_identifier
self._controller.register(self._axis_id)
def _socket_get(self): # type: ignore
self._readback, self.metadata["timestamp"] = self._controller.get_readback(
self._axis_id
) or (0.0, 0.0)
return self._readback
def _socket_set(self, val: float):
self._controller.put(self._axis_id, val)
def get(self, **kwargs):
if self._controller.monitor_stopped():
self._controller.start_monitor()
return super().get(**kwargs)
class HttpOphydDevice(PSIDeviceBase):
controller_class: type[HttpDeviceController]
def __init__(
self,
*,
name: str,
prefix: str = "",
scan_info=None,
device_manager=None,
**kwargs,
):
self.controller = self.controller_class(prefix=prefix)
super().__init__(
name=name,
prefix=prefix,
scan_info=scan_info,
device_manager=device_manager,
**kwargs,
)
def wait_for_connection(self, **kwargs): # type: ignore
self.controller.start_monitor()
self.controller.manual_update()
return super().wait_for_connection(**kwargs)
def stop(self, *, success: bool = False) -> None:
self.controller.stop()
return super().stop(success=success)
+42
View File
@@ -0,0 +1,42 @@
from ophyd import Component as Cpt
from ophyd_devices import PSIDeviceBase
from .http import HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
_TIMESTAMP_ID = "__timestamp"
_POLL_INTERVAL_SLOW = 0.1
class SmargonController(HttpDeviceController):
"""Controller to consolidate polling loops and other REST calls for the smargon"""
_readback_endpoint = "/readbackSCS"
_target_endpoint = "/targetSCS"
def __init__(self, *, prefix, **kwargs):
self._readbacks: dict[str, float] = {}
super().__init__(prefix=prefix, **kwargs)
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
with self._readback_lock:
if axis_id not in self._readbacks or _TIMESTAMP_ID not in self._readbacks:
return None
return self._readbacks.get(axis_id), self._readbacks.get(_TIMESTAMP_ID) # type: ignore
def put(self, axis: str, val: float):
self._rest_put(params={axis: val})
def stop(self):
# There doesn't appear to be a stop endpoint on the server
# Best effort: set the target to the current position
self._rest_put(params=self._readbacks)
class Smargon(HttpOphydDevice):
controller_class = SmargonController
x = Cpt(HttpDeviceSignal, axis_identifier="SHX", tolerance=0.01)
y = Cpt(HttpDeviceSignal, axis_identifier="SHY", tolerance=0.01)
z = Cpt(HttpDeviceSignal, axis_identifier="SHZ", tolerance=0.01)
phi = Cpt(HttpDeviceSignal, axis_identifier="PHI", tolerance=0.01)
chi = Cpt(HttpDeviceSignal, axis_identifier="CHI", tolerance=0.01)
+8 -8
View File
@@ -149,7 +149,7 @@ def rock(**kwargs):
import matplotlib.pyplot as plt
dock_area = bec.gui.new()
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
wr = dock_area.new(bec.gui.available_widgets.Waveform)
# width of rocking curve of perfect xtal: at 20 keV: 14 urad == 0.0008 deg
@@ -206,7 +206,7 @@ def rock(**kwargs):
s = scans.line_scan(mot, -dx, dx, steps=50, exp_time=time, relative=True)
# md = scan.metadata["bec"]
wr.title = f"RockingScan at Energy of {e}"
wr.plot(x_name=mot.name, y_name=det.name) ##set names/axes first !
wr.plot(device_x=mot.name, device_y=det.name) ##set names/axes first !
wr.x_label = mot.name
wr.y_label = det.name
if ax == 0:
@@ -890,7 +890,7 @@ def colliscan(direction: str, range=0.3, nsteps=30, stime=0.5, centre=1):
import sys
dock_area = bec.gui.new()
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
wr = dock_area.new(bec.gui.available_widgets.Waveform)
# check if i1 DIODE is IN
# if not, aks to be moved
@@ -1064,14 +1064,14 @@ def slitscan(device_location: str, direction: str, range: 1, nsteps=50, centre=0
return
dock_area = bec.gui.new()
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
wr = dock_area.new(bec.gui.available_widgets.Waveform)
pos0 = mot.user_readback.get()
siz0 = size.user_readback.get()
umv(size, s_closed)
s = scans.line_scan(mot, -dx, dx, steps=nsteps, exp_time=time, relative=True)
wr.plot(x_name=mot.name, y_name=det.name)
wr.plot(device_x=mot.name, device_y=det.name)
wr.x_label = mot.name
wr.y_label = det.name
@@ -1261,11 +1261,11 @@ def bstatus():
#
dock_area = bec.gui.new()
dbrowser = dock_area.new("device_browser").new(
bec.gui.available_widgets.DeviceBrowser
dbrowser = dock_area.new(
bec.gui.available_widgets.DeviceBrowser, object_name="device_browser"
)
dock_area.new("queue").new(bec.gui.available_widgets.BECQueue)
dock_area.new(bec.gui.available_widgets.BECQueue, object_name="queue")
# queue = dock_area.queue.BECQueue # give it a name
# text_box = dock_area.new().new(widget=bec.gui.available_widgets.TextBox)
# text_box.set_plain_text("Hello, World!")
+4 -5
View File
@@ -225,10 +225,10 @@ def long_gscan(estart=7, end_en=20.5, g_low=4.5, g_high=9.0, nsteps=1500):
resol = (g_high - g_low) / nsteps
print(f"nsteps = {nsteps}; resolution is {resol} mm")
dock_area = bec.gui.new("LongGapScan")
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
wr = dock_area.new(bec.gui.available_widgets.Waveform)
mot = dev.id_gap
det = dev.lu_bpmsum
wr.plot(x_name=mot.name, y_name=det.name) ## names first !
wr.plot(device_x=mot.name, device_y=det.name) ## names first !
wr.x_label = mot.name
wr.y_label = det.name
g0 = dev.id_gap.readback.get()
@@ -275,10 +275,10 @@ def gscan(centre=0, gomax=0, detune=0):
import time
dock_area = bec.gui.new()
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
wr = dock_area.new(bec.gui.available_widgets.Waveform)
mot = dev.id_gap
det = dev.lu_bpmsum
wr.plot(x_name=mot.name, y_name=det.name) ## names first !
wr.plot(device_x=mot.name, device_y=det.name) ## names first !
# wr.plot(x=mot.name,y=det.name) ### this comes later
wr.x_label = mot.name
wr.y_label = det.name
@@ -317,4 +317,3 @@ def gscan(centre=0, gomax=0, detune=0):
return
+4 -4
View File
@@ -154,9 +154,9 @@ def select_bec_window(dock_area_name="Fitting"):
open_docks = bec.gui.windows
if open_docks.get(dock_area_name) is None:
dock_area = bec.gui.new(dock_area_name)
wf = dock_area.new("Plot").new(bec.gui.available_widgets.Waveform)
text_box = dock_area.new("Results", position="bottom").new(
widget=bec.gui.available_widgets.TextBox
wf = dock_area.new(bec.gui.available_widgets.Waveform, object_name="Plot")
text_box = dock_area.new(
bec.gui.available_widgets.TextBox, object_name="Results", where="bottom"
)
else:
wf = bec.gui.Fitting.Plot.Waveform
@@ -186,7 +186,7 @@ def plot_live_data_bec(motor_name, signal_name, window_name="Fitting"):
wf.title = "Scan: Live scan"
wf.x_label = motor_name
wf.y_label = signal_name
wf.plot(x_name=motor_name, y_name=signal_name)
wf.plot(device_x=motor_name, device_y=signal_name)
def plot_fitted_data_bec(data, fit_result):
+60 -18
View File
@@ -249,42 +249,79 @@ def scan_bpm(bpmname):
# Open a dock area and set up the heatmaps
dock_area = bec.gui.new("XBPM_Scan")
wf5 = dock_area.new("Sum").new(bec.gui.available_widgets.Heatmap)
wf1 = dock_area.new("Ch1", relative_to="Sum", position="bottom").new(
bec.gui.available_widgets.Heatmap
wf5 = dock_area.new(bec.gui.available_widgets.Heatmap, object_name="Sum")
wf1 = dock_area.new(
bec.gui.available_widgets.Heatmap,
object_name="Ch1",
relative_to="Sum",
where="bottom",
)
wf3 = dock_area.new("Ch3", relative_to="Ch1", position="right").new(
bec.gui.available_widgets.Heatmap
wf3 = dock_area.new(
bec.gui.available_widgets.Heatmap,
object_name="Ch3",
relative_to="Ch1",
where="right",
)
wf4 = dock_area.new("Ch4", relative_to="Ch3", position="bottom").new(
bec.gui.available_widgets.Heatmap
wf4 = dock_area.new(
bec.gui.available_widgets.Heatmap,
object_name="Ch4",
relative_to="Ch3",
where="bottom",
)
wf2 = dock_area.new("Ch2", relative_to="Ch1", position="bottom").new(
bec.gui.available_widgets.Heatmap
wf2 = dock_area.new(
bec.gui.available_widgets.Heatmap,
object_name="Ch2",
relative_to="Ch1",
where="bottom",
)
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
wfscan = dock_area.new(bec.gui.available_widgets.ScanControl, object_name="ScanControl")
cfg = getattr(BPMScans, bpmname)
wf1.x_label = cfg["x_name"]
wf1.y_label = cfg["y_name"]
wf1.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z1_name"], color_map="plasma")
wf1.plot(
device_x=cfg["x_name"],
device_y=cfg["y_name"],
device_z=cfg["z1_name"],
color_map="plasma",
)
wf2.x_label = cfg["x_name"]
wf2.y_label = cfg["y_name"]
wf2.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z2_name"], color_map="plasma")
wf2.plot(
device_x=cfg["x_name"],
device_y=cfg["y_name"],
device_z=cfg["z2_name"],
color_map="plasma",
)
wf3.x_label = cfg["x_name"]
wf3.y_label = cfg["y_name"]
wf3.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z3_name"], color_map="plasma")
wf3.plot(
device_x=cfg["x_name"],
device_y=cfg["y_name"],
device_z=cfg["z3_name"],
color_map="plasma",
)
wf4.x_label = cfg["x_name"]
wf4.y_label = cfg["y_name"]
wf4.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z4_name"], color_map="plasma")
wf4.plot(
device_x=cfg["x_name"],
device_y=cfg["y_name"],
device_z=cfg["z4_name"],
color_map="plasma",
)
wf5.x_label = cfg["x_name"]
wf5.y_label = cfg["y_name"]
wf5.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z5_name"], color_map="plasma")
wf5.plot(
device_x=cfg["x_name"],
device_y=cfg["y_name"],
device_z=cfg["z5_name"],
color_map="plasma",
)
# Run the scan
x_mot = cfg["x_device"]
y_mot = cfg["y_device"]
@@ -304,15 +341,20 @@ def optimise_kb(mirror):
# Open a dock area and set up the heatmaps
dock_area = bec.gui.new(mirror)
wf1 = dock_area.new("Heatmap").new(bec.gui.available_widgets.Heatmap)
wf1 = dock_area.new(bec.gui.available_widgets.Heatmap, object_name="Heatmap")
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
wfscan = dock_area.new(bec.gui.available_widgets.ScanControl, object_name="ScanControl")
cfg = getattr(MirrorConfig, mirror)
wf1.x_label = cfg["bu_name"]
wf1.y_label = cfg["bd_name"]
wf1.plot(x_name=cfg["bu_name"], y_name=cfg["bd_name"], z_name=cfg["z_name"], color_map="plasma")
wf1.plot(
device_x=cfg["bu_name"],
device_y=cfg["bd_name"],
device_z=cfg["z_name"],
color_map="plasma",
)
# Run the scan
x_mot = cfg["x_device"]
+5 -1
View File
@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "pxii_bec"
version = "0.0.0"
description = "A plugin repository for BEC"
requires-python = ">=3.10"
requires-python = ">=3.11"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
@@ -25,6 +25,7 @@ dev = [
"pytest-random-order",
"ophyd_devices",
"bec_server",
"requests-mock",
]
[project.entry-points."bec"]
@@ -76,3 +77,6 @@ good-names-rgxs = [
".*_2D.*",
".*_1D.*",
]
[tool.ruff]
line-length = 100
+110
View File
@@ -0,0 +1,110 @@
"""A mock smargopolo REST interface with mock motoers, for testing devices against"""
import asyncio
import random
import time
from contextlib import asynccontextmanager
from typing import Iterable
import uvicorn
from fastapi import FastAPI, HTTPException, Query, Request
from pydantic import BaseModel
AXES = ["SHX", "SHY", "SHZ", "PHI", "CHI"]
class Motor:
def __init__(self, velocity: float = 1.0):
self.position = 0.0
self.target = 0.0
self.velocity = velocity
self.moving = False
self._last_update = time.monotonic()
def update(self):
now = time.monotonic()
dt = now - self._last_update
self._last_update = now
if not self.moving:
return
jitter_factor = random.random() * 0.05 - 0.025 # +- 2.5% jitter in step
distance = self.target - self.position
direction = 1 if distance > 0 else -1
step = direction * self.velocity * dt
if abs(step) >= abs(distance):
self.position = self.target + (step * jitter_factor)
self.moving = False
else:
self.position += step * (1 + jitter_factor)
motors: dict[str, Motor] = {
"SHX": Motor(velocity=3),
"SHY": Motor(velocity=2.5),
"SHZ": Motor(velocity=2),
"PHI": Motor(velocity=1.0),
"CHI": Motor(velocity=0.7),
}
class MoveRequest(BaseModel):
target: float
@asynccontextmanager
async def lifespan(app: FastAPI):
async def updater():
while True:
for motor in motors.values():
motor.update()
await asyncio.sleep(0.02) # 50 Hz update loop
task = asyncio.create_task(updater())
yield
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
def validate_axes(axes: Iterable[str] | None) -> list[str]:
if axes is None:
return AXES
for a in axes:
if a not in AXES:
raise HTTPException(status_code=404, detail=f"Unknown axis: {a}")
return list(axes)
@app.get("/readbackSCS")
async def readback_scs(axis: list[str] | None = Query(None)):
selected_axes = validate_axes(axis)
return {ax: motors[ax].position for ax in selected_axes}
@app.put("/targetSCS")
async def target_scs(req: Request):
targets = {ax: float(t) for ax, t in req.query_params.items()}
if targets is None:
return {}
selected_axes = validate_axes(targets.keys())
for a in selected_axes:
motor = motors[a]
motor.update()
motor.target = targets[a]
motor.moving = True
return {"targets": targets, "message": "Move started"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, reload=False)
+58
View File
@@ -0,0 +1,58 @@
from copy import copy
from threading import RLock
from unittest.mock import ANY
import pytest
class MockServer:
def __init__(self) -> None:
self.lock = RLock()
self.mock_data = {
"x": {"pos": 1.0},
"y": {"pos": 1.0},
"z": {"pos": 1.0},
"u": {"pos": 1.0},
"vel_u_deg_s": {"pos": 1.0},
}
def get(self, endpoint):
with self.lock:
return copy(self.mock_data)
def put(self, params: dict | None = None, body: dict | None = None):
with self.lock:
assert body is not None
for k, v in body.items():
self.mock_data[k]["pos"] = v
@pytest.fixture
def aerotech():
mock_server = MockServer()
from pxii_bec.devices.aerotech import Aerotech
s = Aerotech(name="aerotech", prefix="http://test-aerotech.psi.ch")
s.controller._rest_get = mock_server.get
s.controller._rest_post = mock_server.put
yield s
s.controller._stop_monitor_readback_event.set()
class TestAerotech:
def test_aerotech_read(self, aerotech):
aerotech.wait_for_connection()
reading = aerotech.read()
assert dict(reading) == {
"aerotech_x": {"value": 1.0, "timestamp": ANY},
"aerotech_y": {"value": 1.0, "timestamp": ANY},
"aerotech_z": {"value": 1.0, "timestamp": ANY},
"aerotech_u": {"value": 1.0, "timestamp": ANY},
"aerotech_vel_u_deg_s": {"value": 1.0, "timestamp": ANY},
}
def test_aerotech_set_with_status(self, aerotech):
aerotech.wait_for_connection()
st = aerotech.x.set(5.0)
st.wait(timeout=1)
assert aerotech.x.get() == 5.0
+51
View File
@@ -0,0 +1,51 @@
from copy import copy
from threading import RLock
from unittest.mock import ANY
import pytest
class MockServer:
def __init__(self) -> None:
self.lock = RLock()
self.mock_data = {"SHX": 1.0, "SHY": 1.0, "SHZ": 1.0, "PHI": 1.0, "CHI": 1.0}
def get(self, endpoint):
with self.lock:
return copy(self.mock_data)
def put(self, params: dict | None = None, body: dict | None = None):
with self.lock:
assert params is not None
self.mock_data.update(params)
@pytest.fixture
def smargon():
mock_server = MockServer()
from pxii_bec.devices.smargopolo_smargon import Smargon
s = Smargon(name="smargon", prefix="http://test-smargopolo.psi.ch")
s.controller._rest_get = mock_server.get
s.controller._rest_put = mock_server.put
yield s
s.controller._stop_monitor_readback_event.set()
class TestSmargon:
def test_smargon_read(self, smargon):
smargon.wait_for_connection()
reading = smargon.read()
assert dict(reading) == {
"smargon_x": {"value": 1.0, "timestamp": ANY},
"smargon_y": {"value": 1.0, "timestamp": ANY},
"smargon_z": {"value": 1.0, "timestamp": ANY},
"smargon_phi": {"value": 1.0, "timestamp": ANY},
"smargon_chi": {"value": 1.0, "timestamp": ANY},
}
def test_smargon_set_with_status(self, smargon):
smargon.wait_for_connection()
st = smargon.x.set(5.0)
st.wait(timeout=1)
assert smargon.x.get() == 5.0