Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1950ee5c85 | |||
| a11fe91998 | |||
|
c58b0c1d20
|
|||
| 1041e3a308 | |||
| 2912a306ab | |||
| 3c54d98ce5 | |||
| feaa340670 |
+1
-1
@@ -2,7 +2,7 @@
|
||||
# It is needed to track the repo template version, and editing may break things.
|
||||
# This file will be overwritten by copier on template updates.
|
||||
|
||||
_commit: v1.2.2
|
||||
_commit: v1.2.8
|
||||
_src_path: https://github.com/bec-project/plugin_copier_template.git
|
||||
make_commit: false
|
||||
project_name: pxii_bec
|
||||
|
||||
+14
-9
@@ -28,7 +28,7 @@ on:
|
||||
description: "Python version to use"
|
||||
required: false
|
||||
type: string
|
||||
default: "3.11"
|
||||
default: "3.12"
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
@@ -44,7 +44,19 @@ jobs:
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
|
||||
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
|
||||
|
||||
- name: Checkout BEC Plugin Repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/pxii_bec
|
||||
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
|
||||
path: ./pxii_bec
|
||||
|
||||
- name: Lint for merge conflicts from template updates
|
||||
shell: bash
|
||||
# Find all Copier conflicts except this line
|
||||
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
|
||||
|
||||
- name: Checkout BEC Core
|
||||
uses: actions/checkout@v4
|
||||
@@ -67,13 +79,6 @@ jobs:
|
||||
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
|
||||
path: ./bec_widgets
|
||||
|
||||
- name: Checkout BEC Plugin Repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/pxii_bec
|
||||
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
|
||||
path: ./pxii_bec
|
||||
|
||||
- name: Install dependencies
|
||||
shell: bash
|
||||
run: |
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
name: Create template upgrade PR for pxii_bec
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
create_update_branch_and_pr:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
|
||||
steps:
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install tools
|
||||
run: |
|
||||
pip install copier PySide6
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Perform update
|
||||
run: |
|
||||
git config --global user.email "bec_ci_staging@psi.ch"
|
||||
git config --global user.name "BEC automated CI"
|
||||
|
||||
branch="chore/update-template-$(python -m uuid)"
|
||||
echo "switching to branch $branch"
|
||||
git checkout -b $branch
|
||||
|
||||
echo "Running copier update..."
|
||||
output="$(copier update --trust --defaults --conflict inline 2>&1)"
|
||||
echo "$output"
|
||||
msg="$(printf '%s\n' "$output" | head -n 1)"
|
||||
|
||||
if ! grep -q "make_commit: true" .copier-answers.yml ; then
|
||||
echo "Autocommit not made, committing..."
|
||||
git add -A
|
||||
git commit -a -m "$msg"
|
||||
fi
|
||||
|
||||
if diff-index --quiet HEAD ; then
|
||||
echo "No changes detected"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
git push -u origin $branch
|
||||
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
|
||||
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{
|
||||
\"title\": \"Template: $(echo $msg)\",
|
||||
\"body\": \"This PR was created by Gitea Actions\",
|
||||
\"head\": \"$(echo $branch)\",
|
||||
\"base\": \"main\"
|
||||
}"
|
||||
@@ -1,7 +0,0 @@
|
||||
include:
|
||||
- file: /templates/plugin-repo-template.yml
|
||||
inputs:
|
||||
name: pxii_bec
|
||||
target: pxii_bec
|
||||
branch: $CHILD_PIPELINE_BRANCH
|
||||
project: bec/awi_utils
|
||||
@@ -42,4 +42,17 @@ dcm_froll:
|
||||
deviceTags:
|
||||
- dcm
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
smargon:
|
||||
description: REST-based device which connects to Smargopolo
|
||||
deviceClass: pxii_bec.devices.smargopolo_smargon.Smargon
|
||||
deviceConfig: {prefix: 'http://x10sa-smargopolo.psi.ch:3000'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- smargon
|
||||
- motors
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
@@ -0,0 +1,165 @@
|
||||
import time
|
||||
from threading import Event, RLock, Thread
|
||||
from typing import Any
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import OphydObject
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
from ophyd_devices.utils.socket import SocketSignal
|
||||
from requests import Response, Session
|
||||
|
||||
_TIMESTAMP_ID = "__timestamp"
|
||||
_POLL_INTERVAL_SLOW = 0.1
|
||||
|
||||
|
||||
class HttpRestError(Exception):
|
||||
"""Error for rest calls from a HttpRestSignal."""
|
||||
|
||||
def __init__(self, resp: Response, *args: object, value: Any | None = None) -> None:
|
||||
method, url = resp.request.method, resp.request.url
|
||||
data = f"{str(value)} to " if value is not None else ""
|
||||
super().__init__(
|
||||
f"Could not {method} {data}{url}. Code: {resp.status_code}. Reason: {resp.reason}.",
|
||||
*args,
|
||||
)
|
||||
|
||||
|
||||
class SmargonSignal(SocketSignal):
|
||||
"""Ophyd signal which gets and puts to a REST API rather than EPICS PVs, mediated through the SmargonController"""
|
||||
|
||||
def __init__(self, *args, axis_identifier: str, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
controller: SmargonController | None = getattr(self.root, "controller", None)
|
||||
if controller is None:
|
||||
raise TypeError("SmargonSignal must be used in a device with a SmargonController")
|
||||
self._controller = controller
|
||||
self._axis_id = axis_identifier
|
||||
self._controller.register(self._axis_id)
|
||||
|
||||
def _socket_get(self): # type: ignore
|
||||
self._readback, self.metadata["timestamp"] = self._controller.get_readback(
|
||||
self._axis_id
|
||||
) or (0.0, 0.0)
|
||||
return self._readback
|
||||
|
||||
def _socket_set(self, val: float):
|
||||
self._controller.put(self._axis_id, val)
|
||||
|
||||
def get(self, **kwargs):
|
||||
if self._controller.monitor_stopped():
|
||||
self._controller.start_monitor()
|
||||
return super().get(**kwargs)
|
||||
|
||||
|
||||
class SmargonController(OphydObject):
|
||||
"""Controller to consolidate polling loops and other REST calls for the smargon"""
|
||||
|
||||
def __init__(self, *, prefix, **kwargs):
|
||||
self._session = Session()
|
||||
self._prefix = prefix
|
||||
self._readback_endpoint = "/readbackSCS"
|
||||
self._target_endpoint = "/targetSCS"
|
||||
self._targets = {}
|
||||
self._signal_registry: set[str] = set()
|
||||
self._readback_poll_interval: float = _POLL_INTERVAL_SLOW
|
||||
|
||||
super().__init__(**kwargs)
|
||||
self._setup_readback()
|
||||
|
||||
def _setup_readback(self):
|
||||
self._readbacks: dict[str, float] = {}
|
||||
self._stop_monitor_readback_event = Event()
|
||||
self._readback_lock = RLock()
|
||||
self._monitor_readback_thread = Thread(
|
||||
target=self._monitor,
|
||||
args=[
|
||||
self._readback_endpoint,
|
||||
self._stop_monitor_readback_event,
|
||||
self._readback_lock,
|
||||
self._readbacks,
|
||||
],
|
||||
)
|
||||
|
||||
def manual_update(self):
|
||||
self._update_reading(self._readback_endpoint, self._readback_lock, self._readbacks)
|
||||
|
||||
def _update_reading(self, endpoint: str, lock: RLock, buffer: dict):
|
||||
data = self._rest_get(endpoint)
|
||||
timestamp = time.monotonic()
|
||||
with lock:
|
||||
buffer.update(data)
|
||||
buffer["__timestamp"] = timestamp
|
||||
|
||||
def _monitor(self, endpoint: str, event: Event, lock: RLock, buffer: dict):
|
||||
while not event.is_set():
|
||||
self._update_reading(endpoint, lock, buffer)
|
||||
time.sleep(self._readback_poll_interval)
|
||||
|
||||
def _clean_monitor(self):
|
||||
if self._monitor_readback_thread.is_alive():
|
||||
self._stop_monitor_readback_event.set()
|
||||
self._monitor_readback_thread.join(timeout=2)
|
||||
if self._monitor_readback_thread.is_alive():
|
||||
raise RuntimeError("Failed to clean up Smargon monitor thread.")
|
||||
|
||||
def register(self, axis_id: str):
|
||||
self._signal_registry.add(axis_id)
|
||||
|
||||
def _rest_get(self, endpoint):
|
||||
resp = self._session.get(self._prefix + endpoint)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp)
|
||||
return resp.json()
|
||||
|
||||
def _rest_put(self, val: dict[str, float]):
|
||||
resp = self._session.put(self._prefix + self._target_endpoint, params=val)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp, value=val)
|
||||
|
||||
def start_monitor(self):
|
||||
"""Start or restart the automonitor thread."""
|
||||
self._clean_monitor()
|
||||
self._setup_readback()
|
||||
self._monitor_readback_thread.start()
|
||||
|
||||
def monitor_stopped(self):
|
||||
return not self._monitor_readback_thread.is_alive()
|
||||
|
||||
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
|
||||
with self._readback_lock:
|
||||
if axis_id not in self._readbacks or _TIMESTAMP_ID not in self._readbacks:
|
||||
return None
|
||||
return self._readbacks.get(axis_id), self._readbacks.get(_TIMESTAMP_ID) # type: ignore
|
||||
|
||||
def put(self, axis: str, val: float):
|
||||
self._rest_put({axis: val})
|
||||
|
||||
def stop(self):
|
||||
# There doesn't appear to be a stop endpoint on the server
|
||||
# Best effort: set the target to the current position
|
||||
self._rest_put(self._readbacks)
|
||||
|
||||
|
||||
class Smargon(PSIDeviceBase):
|
||||
x = Cpt(SmargonSignal, axis_identifier="SHX", tolerance=0.01)
|
||||
y = Cpt(SmargonSignal, axis_identifier="SHY", tolerance=0.01)
|
||||
z = Cpt(SmargonSignal, axis_identifier="SHZ", tolerance=0.01)
|
||||
phi = Cpt(SmargonSignal, axis_identifier="PHI", tolerance=0.01)
|
||||
chi = Cpt(SmargonSignal, axis_identifier="CHI", tolerance=0.01)
|
||||
|
||||
def __init__(
|
||||
self, *, name: str, prefix: str = "", scan_info=None, device_manager=None, **kwargs
|
||||
):
|
||||
self.controller = SmargonController(prefix=prefix)
|
||||
super().__init__(
|
||||
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
|
||||
)
|
||||
|
||||
def wait_for_connection(self, **kwargs): # type: ignore
|
||||
self.controller.start_monitor()
|
||||
self.controller.manual_update()
|
||||
return super().wait_for_connection(**kwargs)
|
||||
|
||||
def stop(self, *, success: bool = False) -> None:
|
||||
self.controller.stop()
|
||||
return super().stop(success=success)
|
||||
@@ -149,7 +149,7 @@ def rock(**kwargs):
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
dock_area = bec.gui.new()
|
||||
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
|
||||
wr = dock_area.new(bec.gui.available_widgets.Waveform)
|
||||
|
||||
# width of rocking curve of perfect xtal: at 20 keV: 14 urad == 0.0008 deg
|
||||
|
||||
@@ -206,7 +206,7 @@ def rock(**kwargs):
|
||||
s = scans.line_scan(mot, -dx, dx, steps=50, exp_time=time, relative=True)
|
||||
# md = scan.metadata["bec"]
|
||||
wr.title = f"RockingScan at Energy of {e}"
|
||||
wr.plot(x_name=mot.name, y_name=det.name) ##set names/axes first !
|
||||
wr.plot(device_x=mot.name, device_y=det.name) ##set names/axes first !
|
||||
wr.x_label = mot.name
|
||||
wr.y_label = det.name
|
||||
if ax == 0:
|
||||
@@ -890,7 +890,7 @@ def colliscan(direction: str, range=0.3, nsteps=30, stime=0.5, centre=1):
|
||||
import sys
|
||||
|
||||
dock_area = bec.gui.new()
|
||||
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
|
||||
wr = dock_area.new(bec.gui.available_widgets.Waveform)
|
||||
|
||||
# check if i1 DIODE is IN
|
||||
# if not, aks to be moved
|
||||
@@ -1064,14 +1064,14 @@ def slitscan(device_location: str, direction: str, range: 1, nsteps=50, centre=0
|
||||
return
|
||||
|
||||
dock_area = bec.gui.new()
|
||||
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
|
||||
wr = dock_area.new(bec.gui.available_widgets.Waveform)
|
||||
|
||||
pos0 = mot.user_readback.get()
|
||||
siz0 = size.user_readback.get()
|
||||
umv(size, s_closed)
|
||||
|
||||
s = scans.line_scan(mot, -dx, dx, steps=nsteps, exp_time=time, relative=True)
|
||||
wr.plot(x_name=mot.name, y_name=det.name)
|
||||
wr.plot(device_x=mot.name, device_y=det.name)
|
||||
wr.x_label = mot.name
|
||||
wr.y_label = det.name
|
||||
|
||||
@@ -1261,11 +1261,11 @@ def bstatus():
|
||||
#
|
||||
dock_area = bec.gui.new()
|
||||
|
||||
dbrowser = dock_area.new("device_browser").new(
|
||||
bec.gui.available_widgets.DeviceBrowser
|
||||
dbrowser = dock_area.new(
|
||||
bec.gui.available_widgets.DeviceBrowser, object_name="device_browser"
|
||||
)
|
||||
|
||||
dock_area.new("queue").new(bec.gui.available_widgets.BECQueue)
|
||||
dock_area.new(bec.gui.available_widgets.BECQueue, object_name="queue")
|
||||
# queue = dock_area.queue.BECQueue # give it a name
|
||||
# text_box = dock_area.new().new(widget=bec.gui.available_widgets.TextBox)
|
||||
# text_box.set_plain_text("Hello, World!")
|
||||
|
||||
@@ -225,10 +225,10 @@ def long_gscan(estart=7, end_en=20.5, g_low=4.5, g_high=9.0, nsteps=1500):
|
||||
resol = (g_high - g_low) / nsteps
|
||||
print(f"nsteps = {nsteps}; resolution is {resol} mm")
|
||||
dock_area = bec.gui.new("LongGapScan")
|
||||
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
|
||||
wr = dock_area.new(bec.gui.available_widgets.Waveform)
|
||||
mot = dev.id_gap
|
||||
det = dev.lu_bpmsum
|
||||
wr.plot(x_name=mot.name, y_name=det.name) ## names first !
|
||||
wr.plot(device_x=mot.name, device_y=det.name) ## names first !
|
||||
wr.x_label = mot.name
|
||||
wr.y_label = det.name
|
||||
g0 = dev.id_gap.readback.get()
|
||||
@@ -275,10 +275,10 @@ def gscan(centre=0, gomax=0, detune=0):
|
||||
import time
|
||||
|
||||
dock_area = bec.gui.new()
|
||||
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
|
||||
wr = dock_area.new(bec.gui.available_widgets.Waveform)
|
||||
mot = dev.id_gap
|
||||
det = dev.lu_bpmsum
|
||||
wr.plot(x_name=mot.name, y_name=det.name) ## names first !
|
||||
wr.plot(device_x=mot.name, device_y=det.name) ## names first !
|
||||
# wr.plot(x=mot.name,y=det.name) ### this comes later
|
||||
wr.x_label = mot.name
|
||||
wr.y_label = det.name
|
||||
@@ -317,4 +317,3 @@ def gscan(centre=0, gomax=0, detune=0):
|
||||
|
||||
return
|
||||
|
||||
|
||||
|
||||
@@ -154,9 +154,9 @@ def select_bec_window(dock_area_name="Fitting"):
|
||||
open_docks = bec.gui.windows
|
||||
if open_docks.get(dock_area_name) is None:
|
||||
dock_area = bec.gui.new(dock_area_name)
|
||||
wf = dock_area.new("Plot").new(bec.gui.available_widgets.Waveform)
|
||||
text_box = dock_area.new("Results", position="bottom").new(
|
||||
widget=bec.gui.available_widgets.TextBox
|
||||
wf = dock_area.new(bec.gui.available_widgets.Waveform, object_name="Plot")
|
||||
text_box = dock_area.new(
|
||||
bec.gui.available_widgets.TextBox, object_name="Results", where="bottom"
|
||||
)
|
||||
else:
|
||||
wf = bec.gui.Fitting.Plot.Waveform
|
||||
@@ -186,7 +186,7 @@ def plot_live_data_bec(motor_name, signal_name, window_name="Fitting"):
|
||||
wf.title = "Scan: Live scan"
|
||||
wf.x_label = motor_name
|
||||
wf.y_label = signal_name
|
||||
wf.plot(x_name=motor_name, y_name=signal_name)
|
||||
wf.plot(device_x=motor_name, device_y=signal_name)
|
||||
|
||||
|
||||
def plot_fitted_data_bec(data, fit_result):
|
||||
|
||||
@@ -249,42 +249,79 @@ def scan_bpm(bpmname):
|
||||
|
||||
# Open a dock area and set up the heatmaps
|
||||
dock_area = bec.gui.new("XBPM_Scan")
|
||||
wf5 = dock_area.new("Sum").new(bec.gui.available_widgets.Heatmap)
|
||||
wf1 = dock_area.new("Ch1", relative_to="Sum", position="bottom").new(
|
||||
bec.gui.available_widgets.Heatmap
|
||||
wf5 = dock_area.new(bec.gui.available_widgets.Heatmap, object_name="Sum")
|
||||
wf1 = dock_area.new(
|
||||
bec.gui.available_widgets.Heatmap,
|
||||
object_name="Ch1",
|
||||
relative_to="Sum",
|
||||
where="bottom",
|
||||
)
|
||||
wf3 = dock_area.new("Ch3", relative_to="Ch1", position="right").new(
|
||||
bec.gui.available_widgets.Heatmap
|
||||
wf3 = dock_area.new(
|
||||
bec.gui.available_widgets.Heatmap,
|
||||
object_name="Ch3",
|
||||
relative_to="Ch1",
|
||||
where="right",
|
||||
)
|
||||
wf4 = dock_area.new("Ch4", relative_to="Ch3", position="bottom").new(
|
||||
bec.gui.available_widgets.Heatmap
|
||||
wf4 = dock_area.new(
|
||||
bec.gui.available_widgets.Heatmap,
|
||||
object_name="Ch4",
|
||||
relative_to="Ch3",
|
||||
where="bottom",
|
||||
)
|
||||
wf2 = dock_area.new("Ch2", relative_to="Ch1", position="bottom").new(
|
||||
bec.gui.available_widgets.Heatmap
|
||||
wf2 = dock_area.new(
|
||||
bec.gui.available_widgets.Heatmap,
|
||||
object_name="Ch2",
|
||||
relative_to="Ch1",
|
||||
where="bottom",
|
||||
)
|
||||
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
|
||||
wfscan = dock_area.new(bec.gui.available_widgets.ScanControl, object_name="ScanControl")
|
||||
|
||||
cfg = getattr(BPMScans, bpmname)
|
||||
|
||||
wf1.x_label = cfg["x_name"]
|
||||
wf1.y_label = cfg["y_name"]
|
||||
wf1.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z1_name"], color_map="plasma")
|
||||
wf1.plot(
|
||||
device_x=cfg["x_name"],
|
||||
device_y=cfg["y_name"],
|
||||
device_z=cfg["z1_name"],
|
||||
color_map="plasma",
|
||||
)
|
||||
|
||||
wf2.x_label = cfg["x_name"]
|
||||
wf2.y_label = cfg["y_name"]
|
||||
wf2.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z2_name"], color_map="plasma")
|
||||
wf2.plot(
|
||||
device_x=cfg["x_name"],
|
||||
device_y=cfg["y_name"],
|
||||
device_z=cfg["z2_name"],
|
||||
color_map="plasma",
|
||||
)
|
||||
|
||||
wf3.x_label = cfg["x_name"]
|
||||
wf3.y_label = cfg["y_name"]
|
||||
wf3.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z3_name"], color_map="plasma")
|
||||
wf3.plot(
|
||||
device_x=cfg["x_name"],
|
||||
device_y=cfg["y_name"],
|
||||
device_z=cfg["z3_name"],
|
||||
color_map="plasma",
|
||||
)
|
||||
|
||||
wf4.x_label = cfg["x_name"]
|
||||
wf4.y_label = cfg["y_name"]
|
||||
wf4.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z4_name"], color_map="plasma")
|
||||
wf4.plot(
|
||||
device_x=cfg["x_name"],
|
||||
device_y=cfg["y_name"],
|
||||
device_z=cfg["z4_name"],
|
||||
color_map="plasma",
|
||||
)
|
||||
|
||||
wf5.x_label = cfg["x_name"]
|
||||
wf5.y_label = cfg["y_name"]
|
||||
wf5.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z5_name"], color_map="plasma")
|
||||
wf5.plot(
|
||||
device_x=cfg["x_name"],
|
||||
device_y=cfg["y_name"],
|
||||
device_z=cfg["z5_name"],
|
||||
color_map="plasma",
|
||||
)
|
||||
# Run the scan
|
||||
x_mot = cfg["x_device"]
|
||||
y_mot = cfg["y_device"]
|
||||
@@ -304,15 +341,20 @@ def optimise_kb(mirror):
|
||||
|
||||
# Open a dock area and set up the heatmaps
|
||||
dock_area = bec.gui.new(mirror)
|
||||
wf1 = dock_area.new("Heatmap").new(bec.gui.available_widgets.Heatmap)
|
||||
wf1 = dock_area.new(bec.gui.available_widgets.Heatmap, object_name="Heatmap")
|
||||
|
||||
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
|
||||
wfscan = dock_area.new(bec.gui.available_widgets.ScanControl, object_name="ScanControl")
|
||||
|
||||
cfg = getattr(MirrorConfig, mirror)
|
||||
|
||||
wf1.x_label = cfg["bu_name"]
|
||||
wf1.y_label = cfg["bd_name"]
|
||||
wf1.plot(x_name=cfg["bu_name"], y_name=cfg["bd_name"], z_name=cfg["z_name"], color_map="plasma")
|
||||
wf1.plot(
|
||||
device_x=cfg["bu_name"],
|
||||
device_y=cfg["bd_name"],
|
||||
device_z=cfg["z_name"],
|
||||
color_map="plasma",
|
||||
)
|
||||
|
||||
# Run the scan
|
||||
x_mot = cfg["x_device"]
|
||||
|
||||
+2
-1
@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
|
||||
name = "pxii_bec"
|
||||
version = "0.0.0"
|
||||
description = "A plugin repository for BEC"
|
||||
requires-python = ">=3.10"
|
||||
requires-python = ">=3.11"
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Programming Language :: Python :: 3",
|
||||
@@ -25,6 +25,7 @@ dev = [
|
||||
"pytest-random-order",
|
||||
"ophyd_devices",
|
||||
"bec_server",
|
||||
"requests-mock",
|
||||
]
|
||||
|
||||
[project.entry-points."bec"]
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
"""A mock smargopolo REST interface with mock motoers, for testing devices against"""
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Iterable
|
||||
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, HTTPException, Query, Request
|
||||
from pydantic import BaseModel
|
||||
|
||||
AXES = ["SHX", "SHY", "SHZ", "PHI", "CHI"]
|
||||
|
||||
|
||||
class Motor:
|
||||
def __init__(self, velocity: float = 1.0):
|
||||
self.position = 0.0
|
||||
self.target = 0.0
|
||||
self.velocity = velocity
|
||||
self.moving = False
|
||||
self._last_update = time.monotonic()
|
||||
|
||||
def update(self):
|
||||
now = time.monotonic()
|
||||
dt = now - self._last_update
|
||||
self._last_update = now
|
||||
|
||||
if not self.moving:
|
||||
return
|
||||
|
||||
jitter_factor = random.random() * 0.05 - 0.025 # +- 2.5% jitter in step
|
||||
distance = self.target - self.position
|
||||
direction = 1 if distance > 0 else -1
|
||||
step = direction * self.velocity * dt
|
||||
|
||||
if abs(step) >= abs(distance):
|
||||
self.position = self.target + (step * jitter_factor)
|
||||
self.moving = False
|
||||
else:
|
||||
self.position += step * (1 + jitter_factor)
|
||||
|
||||
|
||||
motors: dict[str, Motor] = {
|
||||
"SHX": Motor(velocity=3),
|
||||
"SHY": Motor(velocity=2.5),
|
||||
"SHZ": Motor(velocity=2),
|
||||
"PHI": Motor(velocity=1.0),
|
||||
"CHI": Motor(velocity=0.7),
|
||||
}
|
||||
|
||||
|
||||
class MoveRequest(BaseModel):
|
||||
target: float
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
async def updater():
|
||||
while True:
|
||||
for motor in motors.values():
|
||||
motor.update()
|
||||
await asyncio.sleep(0.02) # 50 Hz update loop
|
||||
|
||||
task = asyncio.create_task(updater())
|
||||
yield
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
|
||||
def validate_axes(axes: Iterable[str] | None) -> list[str]:
|
||||
if axes is None:
|
||||
return AXES
|
||||
for a in axes:
|
||||
if a not in AXES:
|
||||
raise HTTPException(status_code=404, detail=f"Unknown axis: {a}")
|
||||
return list(axes)
|
||||
|
||||
|
||||
@app.get("/readbackSCS")
|
||||
async def readback_scs(axis: list[str] | None = Query(None)):
|
||||
selected_axes = validate_axes(axis)
|
||||
return {ax: motors[ax].position for ax in selected_axes}
|
||||
|
||||
|
||||
@app.put("/targetSCS")
|
||||
async def target_scs(req: Request):
|
||||
targets = {ax: float(t) for ax, t in req.query_params.items()}
|
||||
if targets is None:
|
||||
return {}
|
||||
|
||||
selected_axes = validate_axes(targets.keys())
|
||||
|
||||
for a in selected_axes:
|
||||
motor = motors[a]
|
||||
motor.update()
|
||||
motor.target = targets[a]
|
||||
motor.moving = True
|
||||
|
||||
return {"targets": targets, "message": "Move started"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000, reload=False)
|
||||
@@ -0,0 +1,50 @@
|
||||
from copy import copy
|
||||
from threading import RLock
|
||||
from unittest.mock import ANY, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class MockServer:
|
||||
def __init__(self) -> None:
|
||||
self.lock = RLock()
|
||||
self.mock_data = {"SHX": 1.0, "SHY": 1.0, "SHZ": 1.0, "PHI": 1.0, "CHI": 1.0}
|
||||
|
||||
def get(self, endpoint):
|
||||
with self.lock:
|
||||
return copy(self.mock_data)
|
||||
|
||||
def put(self, val: dict[str, float]):
|
||||
with self.lock:
|
||||
self.mock_data.update(val)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def smargon():
|
||||
mock_server = MockServer()
|
||||
from pxii_bec.devices.smargopolo_smargon import Smargon
|
||||
|
||||
s = Smargon(name="smargon", prefix="http://test-smargopolo.psi.ch")
|
||||
s.controller._rest_get = mock_server.get
|
||||
s.controller._rest_put = mock_server.put
|
||||
yield s
|
||||
s.controller._stop_monitor_readback_event.set()
|
||||
|
||||
|
||||
class TestSmargon:
|
||||
def test_smargon_read(self, smargon):
|
||||
smargon.wait_for_connection()
|
||||
reading = smargon.read()
|
||||
assert dict(reading) == {
|
||||
"smargon_x": {"value": 1.0, "timestamp": ANY},
|
||||
"smargon_y": {"value": 1.0, "timestamp": ANY},
|
||||
"smargon_z": {"value": 1.0, "timestamp": ANY},
|
||||
"smargon_phi": {"value": 1.0, "timestamp": ANY},
|
||||
"smargon_chi": {"value": 1.0, "timestamp": ANY},
|
||||
}
|
||||
|
||||
def test_smargon_set_with_status(self, smargon):
|
||||
smargon.wait_for_connection()
|
||||
st = smargon.x.set(5.0)
|
||||
st.wait(timeout=1)
|
||||
assert smargon.x.get() == 5.0
|
||||
Reference in New Issue
Block a user