Compare commits

...

18 Commits

Author SHA1 Message Date
x12sa
8b960d0365 wip adjusting to V3
Some checks failed
CI for csaxs_bec / test (push) Failing after 36s
CI for csaxs_bec / test (pull_request) Failing after 39s
2026-03-01 18:11:02 +01:00
x01dc
cde3fe692e set fixed on the script 2026-03-01 18:11:02 +01:00
x01dc
4ac9c52fdf colorbar changed to greys 2026-03-01 18:11:02 +01:00
b858494bc6 wip unify the live mode on cameras 2026-03-01 18:11:02 +01:00
x01dc
2011c2082a feat(xray_eye): alignment gui and script adapted to not use epics gui device 2026-03-01 18:11:02 +01:00
5ed5199e1e wip ids camera subscription callback - run false also put directly from init kwarg 2026-03-01 18:11:02 +01:00
e269e9b22f wip ids camera subscription callback - run false 2026-03-01 18:11:02 +01:00
fde8903411 wip ids camera subscription callback 2026-03-01 18:11:02 +01:00
6bd7dbb5df wip ids camera live mode signal 2026-03-01 18:11:02 +01:00
8b2c7df99e feat(allied-vision-camera): Add allied vision camera integration 2026-03-01 18:11:02 +01:00
d4cfa750be fix(bec_widgets): removed omny alignment old gui
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m59s
CI for csaxs_bec / test (pull_request) Successful in 1m59s
2026-03-01 18:11:02 +01:00
3d62bea04b Update repo with template version v1.2.8
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m55s
CI for csaxs_bec / test (push) Successful in 1m54s
2026-02-27 16:25:22 +01:00
1518845d25 resolve merge conflicts 2026-02-27 16:25:22 +01:00
ff3b6686db Update repo with template version v1.2.7 2026-02-27 16:25:22 +01:00
afdc64e296 fix(rio): fix rio cached readings
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m40s
CI for csaxs_bec / test (push) Successful in 1m36s
2026-02-26 16:15:29 +01:00
bc31c00e1f fix(tests): x_ray_eye_align correct imports fixed after refactor of LamNI
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m36s
CI for csaxs_bec / test (push) Successful in 1m37s
2026-02-23 13:25:09 +01:00
x01dc
38671f074e minor printout fix
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m30s
CI for csaxs_bec / test (push) Failing after 1m32s
2026-02-23 12:44:04 +01:00
x01dc
92e39a5f75 minor adjmustment 2026-02-23 12:35:56 +01:00
27 changed files with 756 additions and 726 deletions

View File

@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things. # It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates. # This file will be overwritten by copier on template updates.
_commit: v1.2.2 _commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git _src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false make_commit: false
project_name: csaxs_bec project_name: csaxs_bec

View File

@@ -28,7 +28,7 @@ on:
description: "Python version to use" description: "Python version to use"
required: false required: false
type: string type: string
default: "3.11" default: "3.12"
permissions: permissions:
pull-requests: write pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}" python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core - name: Checkout BEC Core
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
@@ -55,13 +67,6 @@ jobs:
- name: Checkout BEC Widgets - name: Checkout BEC Widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Install dependencies - name: Install dependencies
shell: bash shell: bash
run: | run: |

View File

@@ -0,0 +1,62 @@
name: Create template upgrade PR for csaxs_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"

View File

@@ -1,20 +0,0 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "csaxs"
target: "csaxs_bec"
branch: $CHILD_PIPELINE_BRANCH
pages:
stage: Deploy
needs: []
variables:
TARGET_BRANCH: $CI_COMMIT_REF_NAME
rules:
- if: "$CI_COMMIT_TAG != null"
variables:
TARGET_BRANCH: $CI_COMMIT_TAG
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/csaxs_bec"'
script:
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/

View File

@@ -210,13 +210,11 @@ class LamNI(LamNIOpticsMixin):
self.feedback_status() self.feedback_status()
def feedback_status(self): def feedback_status(self):
if self.device_manager.devices.rtx.controller.feedback_is_running(): self.device_manager.devices.rtx.controller.show_feedback_status()
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
else:
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
def show_interferometer_positions(self): def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_interferometer_positions() self.device_manager.devices.rtx.controller.show_feedback_status()
def show_signal_strength(self): def show_signal_strength(self):
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer() self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()

View File

@@ -1004,6 +1004,20 @@ class FlomniAlignmentMixin:
with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file:
tomo_alignment_fit[1][4] = file.readline() tomo_alignment_fit[1][4] = file.readline()
tomo_alignment_fit[0][0] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['amplitude']
tomo_alignment_fit[0][1] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['frequency']
tomo_alignment_fit[0][2] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['shift']
tomo_alignment_fit[1][0] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['amplitude']
tomo_alignment_fit[1][1] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['frequency']
tomo_alignment_fit[1][2] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['shift']
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 0
tomo_alignment_fit[1][4] = 0
print("New alignment parameters loaded:") print("New alignment parameters loaded:")
print( print(
f"X Amplitude {tomo_alignment_fit[0][0]}, " f"X Amplitude {tomo_alignment_fit[0][0]}, "

View File

@@ -1,6 +1,6 @@
import builtins import builtins
from bec_widgets.cli.client import BECDockArea from bec_widgets.cli.client import AdvancedDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose # from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
@@ -26,10 +26,11 @@ class flomniGuiTools:
self.gui = self.client.gui self.gui = self.client.gui
def flomnigui_show_gui(self): def flomnigui_show_gui(self):
if "flomni" in self.gui.windows: self.gui.new("flomni")
self.gui.flomni.show() # if "flomni" in self.gui.windows:
else: # self.gui.flomni.show()
self.gui.new("flomni") # else:
# self.gui.new("flomni")
def flomnigui_stop_gui(self): def flomnigui_stop_gui(self):
self.gui.flomni.hide() self.gui.flomni.hide()
@@ -37,24 +38,23 @@ class flomniGuiTools:
def flomnigui_raise(self): def flomnigui_raise(self):
self.gui.flomni.raise_window() self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self): def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui() self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"): if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye") self.xeyegui = self.gui.flomni.new("XRayEye",object_name='xrayeye')
# start live # start live
if not dev.cam_xeye.live_mode: if not dev.cam_xeye.live_mode_enabled.get():
dev.cam_xeye.live_mode = True # dev.cam_xeye.live_mode = True
dev.cam_xeye.live_mode_enabled.put(True)
self.xeyegui.switch_tab('alignment')
def flomnigui_show_xeyealign_fittab(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("XRayEye")
self.xeyegui.switch_tab('fit')
def _flomnigui_check_attribute_not_exists(self, attribute_name): def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"flomni"): if hasattr(self.gui,"flomni"):
@@ -67,7 +67,7 @@ class flomniGuiTools:
self.flomnigui_show_gui() self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"): if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image") camera_gripper_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper): if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(("cam_flomni_gripper", "preview")) camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True camera_gripper_image.lock_aspect_ratio = True
@@ -78,7 +78,7 @@ class flomniGuiTools:
dev.cam_flomni_gripper.start_live_mode() dev.cam_flomni_gripper.start_live_mode()
else: else:
print("Cannot open camera_gripper. Device does not exist.") print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image") camera_overview_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview): if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(("cam_flomni_overview", "preview")) camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True camera_overview_image.lock_aspect_ratio = True
@@ -102,7 +102,7 @@ class flomniGuiTools:
self.flomnigui_show_gui() self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"): if self._flomnigui_check_attribute_not_exists("idle_text_box"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox") idle_text_box = self.gui.flomni.new("TextBox")
text = ( text = (
"<pre>" "<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n" + " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
@@ -162,7 +162,7 @@ class flomniGuiTools:
self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget") self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget")
# --- Load PDF --------------------------------------------------------- # --- Load PDF ---------------------------------------------------------
self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve())) self.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n") print(f"\nLoaded: {pdf_file.name}\n")
@@ -179,7 +179,7 @@ class flomniGuiTools:
if self._flomnigui_check_attribute_not_exists("progressbar"): if self._flomnigui_check_attribute_not_exists("progressbar"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget # Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar") self.progressbar = self.gui.flomni.new("RingProgressBar")
# Customize the size of the progress ring # Customize the size of the progress ring
self.progressbar.set_line_widths(20) self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value # Disable automatic updates and manually set the self.progressbar value
@@ -194,7 +194,7 @@ class flomniGuiTools:
# Set the values of the rings to 50, 75, and 25 from outer to inner ring # Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75]) # self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget # Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox") self.text_box = self.gui.flomni.new("TextBox")
self._flomnigui_update_progress() self._flomnigui_update_progress()

View File

@@ -5,9 +5,11 @@ import os
import time import time
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import numpy as np
from bec_lib import bec_logger from bec_lib import bec_logger
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose # from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger logger = bec_logger.logger
# import builtins to avoid linter errors # import builtins to avoid linter errors
@@ -22,7 +24,7 @@ if TYPE_CHECKING:
class XrayEyeAlign: class XrayEyeAlign:
# pixel calibration, multiply to get mm # pixel calibration, multiply to get mm
labview=False test_wo_movements = True
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None: def __init__(self, client, flomni: Flomni) -> None:
@@ -34,209 +36,194 @@ class XrayEyeAlign:
self.flomni.reset_correction() self.flomni.reset_correction()
self.flomni.reset_tomo_alignment_fit() self.flomni.reset_tomo_alignment_fit()
@property
def gui(self):
return self.flomni.xeyegui
def _reset_init_values(self): def _reset_init_values(self):
self.shift_xy = [0, 0] self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0] self._xray_fov_xy = [0, 0]
def save_frame(self): def update_frame(self, keep_shutter_open=False):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self,keep_shutter_open=False): # self.flomni.flomnigui_show_xeyealign()
if self.labview: if not dev.cam_xeye.live_mode_enabled.get():
epics_put("XOMNYI-XEYE-ACQDONE:0", 0) dev.cam_xeye.live_mode_enabled.put(True)
if not self.labview: self.gui.on_live_view_enabled(True)
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
epics_put("XOMNYI-XEYE-ACQ:0", 1) dev.omnyfsh.fshopen()
if self.labview:
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5) time.sleep(0.5)
# stop live view # stop live view
if not keep_shutter_open: if not keep_shutter_open:
epics_put("XOMNYI-XEYE-ACQ:0", 0) self.gui.on_live_view_enabled(False)
time.sleep(0.1) time.sleep(0.1)
fshclose() dev.omnyfsh.fshclose()
print("got new frame") print("Received new frame.")
else: else:
print("Staying in live view, shutter is and remains open!") print("Staying in live view, shutter is and remains open!")
def tomo_rotate(self, val: float): def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable if not self.test_wo_movements:
umv(self.device_manager.devices.fsamroy, val) umv(self.device_manager.devices.fsamroy, val)
def get_tomo_angle(self): def get_tomo_angle(self):
return self.device_manager.devices.fsamroy.readback.get() return self.device_manager.devices.fsamroy.readback.get()
def update_fov(self, k: int): def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0]) self._xray_fov_xy[0] = max(
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0]) getattr(dev.omny_xray_gui, f"width_x_{k}").get(), self._xray_fov_xy[0]
)
self._xray_fov_xy[1] = max(
getattr(dev.omny_xray_gui, f"width_y_{k}").get(), self._xray_fov_xy[1]
)
@property def movement_buttons_enabled(self, enablex: bool, enabley: bool):
def movement_buttons_enabled(self): self.gui.on_motors_enable(enablex, enabley)
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def send_message(self, msg: str): def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg) print(f"In alginment GUI: {msg}")
self.gui.user_message = msg
def align(self,keep_shutter_open=False): def align(self, keep_shutter_open=False):
self.flomni.flomnigui_show_xeyealign()
if not keep_shutter_open: if not keep_shutter_open:
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open") print(
"This routine can be called with paramter keep_shutter_open=True to keep the shutter always open"
)
self.send_message("Getting things ready. Please wait...") self.send_message("Getting things ready. Please wait...")
#potential unresolved movement requests to zero self.gui.enable_submit_button(False)
epics_put("XOMNYI-XEYE-MVX:0", 0)
epics_put("XOMNYI-XEYE-MVY:0", 0) # Initialize xray align device
# clear potential pending movement requests
dev.omny_xray_gui.mvx.set(0)
dev.omny_xray_gui.mvy.set(0)
# reset submit channel
dev.omny_xray_gui.submit.set(0)
self.movement_buttons_enabled(False, False)
# reset shift xy and fov params # reset shift xy and fov params
self._reset_init_values() self._reset_init_values()
self.flomni.lights_off() self.flomni.lights_off()
self.flomni.flomnigui_show_xeyealign() # self.flomni.flomnigui_show_xeyealign()
self.flomni.flomnigui_raise() # self.flomni.flomnigui_raise()
self.tomo_rotate(0) if not self.test_wo_movements:
epics_put("XOMNYI-XEYE-ANGLE:0", 0) self.tomo_rotate(0)
self.flomni.feye_in() self.flomni.feye_in()
self.flomni.laser_tracker_on() self.flomni.laser_tracker_on()
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
# disable movement buttons # disable movement buttons
self.movement_buttons_enabled = False self.movement_buttons_enabled(False, False)
sample_name = self.flomni.sample_get_name(0) sample_name = self.flomni.sample_get_name(0)
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", sample_name) self.gui.sample_name = sample_name
# this makes sure we are in a defined state # this makes sure we are in a defined state
self.flomni.feedback_disable() self.flomni.feedback_disable()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION) if not self.test_wo_movements:
self.flomni.fosa_out()
self.flomni.fosa_out() fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in") self.flomni.ffzp_in()
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in()
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
# enable submit buttons self.gui.enable_submit_button(True)
self.movement_buttons_enabled = False dev.omny_xray_gui.step.set(0).wait()
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.send_message("Submit center value of FZP.") self.send_message("Submit center value of FZP.")
k = 0 k = 0
while True: while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1: if dev.omny_xray_gui.submit.get() == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") / 2 * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = val_x self.alignment_values[k] = (
getattr(dev.omny_xray_gui, f"xval_x_{k}").get() / 2 * self.PIXEL_CALIBRATION
) # in mm
print(f"Clicked position {k}: x {self.alignment_values[k]}") print(f"Clicked position {k}: x {self.alignment_values[k]}")
rtx_position = dev.rtx.readback.get() / 1000 rtx_position = dev.rtx.readback.get() / 1000
print(f"Current rtx position {rtx_position}") print(f"Current rtx position {rtx_position}")
self.alignment_values[k] -= rtx_position self.alignment_values[k] -= rtx_position
print(f"Corrected position {k}: x {self.alignment_values[k]}") print(f"Corrected position {k}: x {self.alignment_values[k]}")
# reset submit channel
dev.omny_xray_gui.submit.set(0)
if k == 0: # received center value of FZP if k == 0: # received center value of FZP
self.send_message("please wait ...") self.send_message("please wait ...")
self.movement_buttons_enabled = False self.movement_buttons_enabled(False, False)
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button self.gui.enable_submit_button(False)
self.flomni.feedback_disable() self.flomni.feedback_disable()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in") if not self.test_wo_movements:
umv(dev.fsamx, fsamx_in) fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in)
self.flomni.foptics_out() self.flomni.foptics_out()
self.flomni.feedback_disable() time.sleep(0.5)
umv(dev.fsamx, fsamx_in - 0.25)
if self.labview:
self.update_frame(keep_shutter_open)
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in)
time.sleep(0.5)
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
self.send_message("Adjust sample height and submit center") self.send_message("Step 1/5: Adjust sample height and submit center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0) self.gui.enable_submit_button(True)
self.movement_buttons_enabled = True self.movement_buttons_enabled(True, True)
elif 1 <= k < 5: # received sample center value at samroy 0 ... 315 elif 1 <= k < 5: # received sample center value at samroy 0 ... 315
self.send_message("please wait ...") self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) self.gui.enable_submit_button(False)
self.movement_buttons_enabled = False self.movement_buttons_enabled(False, False)
umv(dev.rtx, 0) umv(dev.rtx, 0)
self.tomo_rotate(k * 45) self.tomo_rotate(k * 45)
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) dev.omny_xray_gui.angle.set(self.get_tomo_angle())
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
self.send_message("Submit sample center") self.send_message(f"Step {k+1}/5: Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0) self.gui.enable_submit_button(True)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1) self.movement_buttons_enabled(True, False)
self.update_fov(k) self.update_fov(k)
elif k == 5: # received sample center value at samroy 270 and done elif k == 5: # received sample center value at samroy 270 and done
self.send_message("done...") self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button self.gui.enable_submit_button(False)
self.movement_buttons_enabled = False self.movement_buttons_enabled(False, False)
self.update_fov(k) self.update_fov(k)
break break
k += 1 k += 1
epics_put("XOMNYI-XEYE-STEP:0", k) dev.omny_xray_gui.step.set(k)
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0") _xrayeyalignmvx = dev.omny_xray_gui.mvx.get()
if _xrayeyalignmvx != 0: if _xrayeyalignmvx != 0:
umvr(dev.rtx, _xrayeyalignmvx) umvr(dev.rtx, _xrayeyalignmvx)
print(f"Current rtx position {dev.rtx.readback.get() / 1000}") print(f"Current rtx position {dev.rtx.readback.get() / 1000}")
epics_put("XOMNYI-XEYE-MVX:0", 0)
if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3) time.sleep(3)
dev.omny_xray_gui.mvx.set(0)
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
if k < 2: if k < 2:
# allow movements, store movements to calculate center # allow movements, store movements to calculate center
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0") _xrayeyalignmvy = dev.omny_xray_gui.mvy.get()
if _xrayeyalignmvy != 0: if _xrayeyalignmvy != 0:
self.flomni.feedback_disable() self.flomni.feedback_disable()
umvr(dev.fsamy, _xrayeyalignmvy / 1000) if not self.test_wo_movements:
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
time.sleep(2) time.sleep(2)
epics_put("XOMNYI-XEYE-MVY:0", 0) dev.omny_xray_gui.mvy.set(0)
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
time.sleep(0.2) time.sleep(0.1)
self.write_output() self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2 fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
@@ -246,22 +233,17 @@ class XrayEyeAlign:
umv(dev.rtx, 0) umv(dev.rtx, 0)
# free camera if keep_shutter_open:
if self.labview: if self.flomni.OMNYTools.yesno("Close the shutter now?", "y"):
epics_put("XOMNYI-XEYE-ACQ:0", 2) dev.omnyfsh.fshclose()
if keep_shutter_open and not self.labview: self.gui.on_live_view_enabled(False)
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"): print("setting 'XOMNYI-XEYE-ACQ:0'")
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
print( print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy" f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"
f" = {fovy:.0f} microns" f" = {fovy:.0f} microns"
) )
print("Use the matlab routine to FIT the current alignment...") print("Check the fit in the GUI...")
print("Then LOAD ALIGNMENT PARAMETERS by running flomni.read_alignment_offset()\n") print("Then LOAD ALIGNMENT PARAMETERS by running flomni.read_alignment_offset()\n")
@@ -269,9 +251,33 @@ class XrayEyeAlign:
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues") file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")
if not os.path.exists(file): if not os.path.exists(file):
os.makedirs(os.path.dirname(file), exist_ok=True) os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, "w") as alignment_values_file: with open(file, "w") as alignment_values_file:
alignment_values_file.write("angle\thorizontal\n") alignment_values_file.write("angle\thorizontal\n")
# Initialize an empty list to store fovx values
fovx_list = []
fovx_offsets = np.zeros(5) # holds offsets for k = 1..5
for k in range(1, 6): for k in range(1, 6):
fovx_offset = self.alignment_values[0] - self.alignment_values[k] fovx_offset = self.alignment_values[0] - self.alignment_values[k]
fovx_offsets[k - 1] = fovx_offset # store in array
fovx_x = (k - 1) * 45
fovx_list.append([fovx_x, fovx_offset * 1000]) # Append the data to the list
print(f"Writing to file new alignment: number {k}, value x {fovx_offset}") print(f"Writing to file new alignment: number {k}, value x {fovx_offset}")
alignment_values_file.write(f"{(k-1)*45}\t{fovx_offset*1000}\n") alignment_values_file.write(f"{fovx_x}\t{fovx_offset * 1000}\n")
# Now build final numpy array:
data = np.array(
[
[0, 45, 90, 135, 180], # angles
fovx_offsets * 1000, # fovx_offset values
[0, 0, 0, 0, 0],
]
)
self.gui.submit_fit_array(data)
print(f"fit submited with {data}")
print("todo mirko: submitted data is 1000 fold in amplitude")
# self.flomni.flomnigui_show_xeyealign_fittab()

View File

@@ -1,6 +1,6 @@
import builtins import builtins
from bec_widgets.cli.client import BECDockArea from bec_widgets.cli.client import AdvancedDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose # from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose

View File

@@ -83,20 +83,6 @@ class XRayEye(RPCBase):
Return the currently active ROI, or None if no ROI is active. Return the currently active ROI, or None if no ROI is active.
""" """
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property @property
@rpc_call @rpc_call
def user_message(self): def user_message(self):
@@ -111,6 +97,30 @@ class XRayEye(RPCBase):
None None
""" """
@rpc_call
def on_live_view_enabled(self, enabled: "bool"):
"""
None
"""
@rpc_call
def on_motors_enable(self, x_enable: "bool", y_enable: "bool"):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
@rpc_call
def enable_submit_button(self, enable: "int"):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
@property @property
@rpc_call @rpc_call
def sample_name(self): def sample_name(self):
@@ -139,6 +149,18 @@ class XRayEye(RPCBase):
None None
""" """
@rpc_call
def switch_tab(self, tab: "str"):
"""
None
"""
@rpc_call
def submit_fit_array(self, fit_array):
"""
None
"""
class XRayEye2DControl(RPCBase): class XRayEye2DControl(RPCBase):
@rpc_call @rpc_call

View File

@@ -1,140 +0,0 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
return
image.disconnect_monitor("cam_xeye")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['omny_alignment.py']}

View File

@@ -1,125 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -5,6 +5,7 @@ from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.widgets.plots.image.image import Image from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
@@ -21,7 +22,10 @@ from qtpy.QtWidgets import (
QToolButton, QToolButton,
QVBoxLayout, QVBoxLayout,
QWidget, QWidget,
QTextEdit,
QTabWidget
) )
import time
logger = bec_logger.logger logger = bec_logger.logger
CAMERA = ("cam_xeye", "image") CAMERA = ("cam_xeye", "image")
@@ -124,8 +128,8 @@ class XRayEye2DControl(BECWidget, QWidget):
class XRayEye(BECWidget, QWidget): class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter", USER_ACCESS = ["active_roi", "user_message", "user_message.setter","on_live_view_enabled","on_motors_enable","enable_submit_button",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"] "sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter","switch_tab","submit_fit_array"]
PLUGIN = True PLUGIN = True
def __init__(self, parent=None, **kwargs): def __init__(self, parent=None, **kwargs):
@@ -136,21 +140,31 @@ class XRayEye(BECWidget, QWidget):
self._make_connections() self._make_connections()
# Connection to redis endpoints # Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")) self.bec_dispatcher.connect_slot(self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh"))
self.bec_dispatcher.connect_slot(self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0]))
self.connect_motors() self.connect_motors()
self.resize(800, 600) self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger) QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self): def _init_ui(self):
self.core_layout = QHBoxLayout(self) self.root_layout = QVBoxLayout(self)
self.tab_widget = QTabWidget(parent=self)
self.root_layout.addWidget(self.tab_widget)
self.image = Image(parent=self) self.alignment_tab = QWidget(parent=self)
self.core_layout = QHBoxLayout(self.alignment_tab)
self.image = Image(parent=self.alignment_tab)
self.image.color_map = "CET-L2"
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image.inner_axes = False # Disable inner axes to maximize image area self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI self.image.enable_full_colorbar = True
self.image.invert_y = True # Invert y axis to match image coordinates
# Control panel on the right: vertical layout inside a fixed-width widget # Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self) self.control_panel = QWidget(parent=self.alignment_tab)
self.control_panel_layout = QVBoxLayout(self.control_panel) self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0) self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10) self.control_panel_layout.setSpacing(10)
@@ -166,16 +180,35 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_label = QLabel("Live Preview", parent=self) self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self) self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter) header_row.addWidget(self.live_preview_label, 0, Qt.AlignmentFlag.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter) header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(header_row) self.control_panel_layout.addLayout(header_row)
switch_row = QHBoxLayout()
switch_row.setContentsMargins(0, 0, 0, 0)
switch_row.setSpacing(8)
switch_row.addStretch()
self.camera_running_label = QLabel("Camera running", parent=self)
self.camera_running_toggle = ToggleSwitch(parent=self)
# self.camera_running_toggle.checked = False
self.camera_running_toggle.enabled.connect(self.camera_running_enabled)
self.shutter_label = QLabel("Shutter open", parent=self)
self.shutter_toggle = ToggleSwitch(parent=self)
# self.shutter_toggle.checked = False
self.shutter_toggle.enabled.connect(self.opening_shutter)
switch_row.addWidget(self.shutter_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.shutter_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(switch_row)
# separator # separator
self.control_panel_layout.addWidget(self._create_separator()) self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size) # 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self) self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter) self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignCenter)
# separator # separator
self.control_panel_layout.addWidget(self._create_separator()) self.control_panel_layout.addWidget(self._create_separator())
@@ -190,9 +223,8 @@ class XRayEye(BECWidget, QWidget):
# Submit button # Submit button
self.submit_button = QPushButton("Submit", parent=self) self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form # Add to layout form
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0) step_size_form.addWidget(QLabel("Step Size", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1) step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2) step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel # Add form to control panel
@@ -207,7 +239,8 @@ class XRayEye(BECWidget, QWidget):
self.sample_name_line_edit.setReadOnly(True) self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0) form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1) form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QLineEdit(parent=self) self.message_line_edit = QTextEdit(parent=self)
self.message_line_edit.setFixedHeight(60)
self.message_line_edit.setReadOnly(True) self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0) form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1) form.addWidget(self.message_line_edit, 1, 1)
@@ -217,12 +250,39 @@ class XRayEye(BECWidget, QWidget):
self.control_panel.adjustSize() self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint() p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width()) self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding) self.control_panel.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding)
# Core Layout: image (expanding) | control panel (fixed) # Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image) self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel) self.core_layout.addWidget(self.control_panel)
self.tab_widget.addTab(self.alignment_tab, "Alignment")
self.fit_tab = QWidget(parent=self)
self.fit_layout = QVBoxLayout(self.fit_tab)
self.waveform_x = Waveform(parent=self.fit_tab)
self.waveform_y = Waveform(parent=self.fit_tab)
self.waveform_x.plot(x=[0],y=[1], label="fit-x",dap="SineModel",dap_parameters={"frequency":{"value":0.0174533,"vary":False,"min":0.01,"max":0.02}},dap_oversample=5)
self.waveform_y.plot(x=[0],y=[2], label="fit-y",dap="SineModel")#,dap_oversample=5)
self.fit_x = self.waveform_x.curves[0]
self.fit_y = self.waveform_y.curves[0]
self.waveform_x.dap_params_update.connect(self.on_dap_params)
self.waveform_y.dap_params_update.connect(self.on_dap_params)
for wave in (self.waveform_x,self.waveform_y):
wave.x_label = "Angle (deg)"
wave.x_grid = True
wave.y_grid = True
wave.enable_toolbar = True
self.fit_layout.addWidget(self.waveform_x)
self.fit_layout.addWidget(self.waveform_y)
self.tab_widget.addTab(self.fit_tab, "Fit")
def _make_connections(self): def _make_connections(self):
# Fetch initial state # Fetch initial state
self.on_live_view_enabled(True) self.on_live_view_enabled(True)
@@ -235,13 +295,14 @@ class XRayEye(BECWidget, QWidget):
def _create_separator(self): def _create_separator(self):
sep = QFrame(parent=self) sep = QFrame(parent=self)
sep.setFrameShape(QFrame.HLine) sep.setFrameShape(QFrame.Shape.HLine)
sep.setFrameShadow(QFrame.Sunken) sep.setFrameShadow(QFrame.Shadow.Sunken)
sep.setLineWidth(1) sep.setLineWidth(1)
return sep return sep
def _init_gui_trigger(self): def _init_gui_trigger(self):
self.dev.omny_xray_gui.read() self.dev.omny_xray_gui.read()
self.dev.omnyfsh.read()
################################################################################ ################################################################################
# Device Connection logic # Device Connection logic
@@ -254,7 +315,7 @@ class XRayEye(BECWidget, QWidget):
for motor in possible_motors: for motor in possible_motors:
if motor in self.dev: if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor)) self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
logger.info(f"Succesfully connected to {motor}") logger.info(f"Successfully connected to {motor}")
################################################################################ ################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed # Properties ported from the original OmnyAlignment, can be adjusted as needed
@@ -291,6 +352,14 @@ class XRayEye(BECWidget, QWidget):
# Slots ported from the original OmnyAlignment, can be adjusted as needed # Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################ ################################################################################
@SafeSlot(str)
def switch_tab(self,tab:str):
if tab == "fit":
self.tab_widget.setCurrentIndex(1)
else:
self.tab_widget.setCurrentIndex(0)
@SafeSlot() @SafeSlot()
def get_roi_coordinates(self) -> dict | None: def get_roi_coordinates(self) -> dict | None:
"""Get the coordinates of the currently active ROI.""" """Get the coordinates of the currently active ROI."""
@@ -307,14 +376,50 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_toggle.blockSignals(True) self.live_preview_toggle.blockSignals(True)
if enabled: if enabled:
self.live_preview_toggle.checked = enabled self.live_preview_toggle.checked = enabled
self.image.image(CAMERA) self.image.image(device_name=CAMERA[0],device_entry=CAMERA[1])
self.live_preview_toggle.blockSignals(False) self.live_preview_toggle.blockSignals(False)
return return
self.image.disconnect_monitor(CAMERA) self.image.disconnect_monitor(CAMERA[0],CAMERA[1])
self.live_preview_toggle.checked = enabled self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False) self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool)
def camera_running_enabled(self, enabled: bool):
logger.info(f"Camera running: {enabled}")
self.camera_running_toggle.blockSignals(True)
self.dev.get(CAMERA[0]).live_mode_enabled.put(enabled)
self.camera_running_toggle.checked = enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(dict,dict)
def getting_camera_status(self,data,meta):
print(f"msg:{data}")
live_mode_enabled = data.get("signals").get(f"{CAMERA[0]}_live_mode_enabled").get("value")
self.camera_running_toggle.blockSignals(True)
self.camera_running_toggle.checked = live_mode_enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(bool)
def opening_shutter(self, enabled: bool):
logger.info(f"Shutter changed from GUI to: {enabled}")
self.shutter_toggle.blockSignals(True)
if enabled:
self.dev.omnyfsh.fshopen()
else:
self.dev.omnyfsh.fshclose()
# self.shutter_toggle.checked = enabled
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict,dict)
def getting_shutter_status(self,data,meta):
shutter_open = bool(data.get("signals").get("omnyfsh_shutter").get("value"))
self.shutter_toggle.blockSignals(True)
self.shutter_toggle.checked = shutter_open
self.shutter_toggle.blockSignals(False)
@SafeSlot(bool, bool) @SafeSlot(bool, bool)
def on_motors_enable(self, x_enable: bool, y_enable: bool): def on_motors_enable(self, x_enable: bool, y_enable: bool):
""" """
@@ -327,17 +432,38 @@ class XRayEye(BECWidget, QWidget):
self.motor_control_2d.enable_controls_hor(x_enable) self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable) self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(int) @SafeSlot(bool)
def enable_submit_button(self, enable: int): def enable_submit_button(self, enable: bool):
""" """
Enable/disable submit button. Enable/disable submit button.
Args: Args:
enable(int): -1 disable else enable enable(int): -1 disable else enable
""" """
if enable == -1: if enable:
self.submit_button.setEnabled(False)
else:
self.submit_button.setEnabled(True) self.submit_button.setEnabled(True)
else:
self.submit_button.setEnabled(False)
@SafeSlot(dict,dict)
def on_dap_params(self,data,meta):
print('#######################################')
print('getting dap parameters')
print(f"data: {data}")
print(f"meta: {meta}")
self.waveform_x.auto_range(True)
self.waveform_y.auto_range(True)
# self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
curve_id = meta.get("curve_id")
if curve_id == "fit-x-SineModel":
self.dev.omny_xray_gui.fit_params_x.set(data).wait()
print(f"setting x data to {data}")
else:
self.dev.omny_xray_gui.fit_params_y.set(data).wait()
print(f"setting y data to {data}")
# self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
@SafeSlot(bool, bool) @SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict): def on_tomo_angle_readback(self, data: dict, meta: dict):
@@ -345,40 +471,21 @@ class XRayEye(BECWidget, QWidget):
print(f"data: {data}") print(f"data: {data}")
print(f"meta: {meta}") print(f"meta: {meta}")
@SafeSlot(dict, dict) @SafeSlot()
def device_updates(self, data: dict, meta: dict): def submit_fit_array(self,fit_array):
""" self.tab_widget.setCurrentIndex(1)
Slot to handle device updates from omny_xray_gui device. # self.fix_x.title = " got fit array"
print(f"got fit array {fit_array}")
self.waveform_x.curves[0].set_data(x=fit_array[0],y=fit_array[1])
# self.fit_x.set_data(x=fit_array[0],y=fit_array[1])
# self.fit_y.set_data(x=fit_array[0],y=fit_array[2])
Args:
data(dict): data from device
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot() @SafeSlot()
def submit(self): def submit(self):
"""Execute submit action by submit button.""" """Execute submit action by submit button."""
print('submit pushed')
self.submit_button.blockSignals(True)
if self.roi_manager.single_active_roi is None: if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI") logger.warning("No active ROI")
return return
@@ -400,11 +507,13 @@ class XRayEye(BECWidget, QWidget):
# submit roi coordinates # submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value')) step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x) xval_x = getattr(self.dev.omny_xray_gui, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y) xval_y = getattr(self.dev.omny_xray_gui, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width) width_x = getattr(self.dev.omny_xray_gui, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height) width_y = getattr(self.dev.omny_xray_gui, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1) self.dev.omny_xray_gui.submit.set(1)
print('submit done')
self.submit_button.blockSignals(False)
def cleanup(self): def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera.""" """Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
@@ -416,9 +525,12 @@ if __name__ == "__main__":
import sys import sys
from qtpy.QtWidgets import QApplication from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv) app = QApplication(sys.argv)
apply_theme("light")
dispatcher = BECDispatcher(gui_id='xray')
win = XRayEye() win = XRayEye()
win.resize(1000, 800) win.resize(1000, 800)

View File

@@ -435,9 +435,9 @@ cam_xeye:
# deviceConfig: # deviceConfig:
# camera_id: 203 # camera_id: 203
# bits_per_pixel: 24 # bits_per_pixel: 24
# num_rotation_90: 3 # num_rotation_90: 2
# transpose: false # transpose: false
# force_monochrome: true # force_monochrome: false
# m_n_colormode: 1 # m_n_colormode: 1
# enabled: true # enabled: true
# onFailure: buffer # onFailure: buffer
@@ -471,19 +471,10 @@ omnyfsh:
#################### GUI Signals ########################### #################### GUI Signals ###########################
############################################################ ############################################################
omny_xray_gui: omny_xray_gui:
description: Gui Epics signals description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayAlignGUI
deviceConfig: {} deviceConfig: {}
enabled: true enabled: true
onFailure: buffer onFailure: buffer
readOnly: false readOnly: false
readoutPriority: on_request readoutPriority: on_request
calculated_signal:
description: Calculated signal from alignment for fit
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline

View File

@@ -0,0 +1,149 @@
"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK."""
import threading
import traceback
from enum import IntEnum
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt, Kind, Signal
from ophyd.areadetector import ADComponent as ADCpt
from ophyd.areadetector import DetectorBase
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
logger = bec_logger.logger
class ACQUIRE_MODES(IntEnum):
"""Acquiring enums for Allied Vision Camera"""
ACQUIRING = 1
DONE = 0
class AlliedVisionCamera(PSIDeviceBase, DetectorBase):
"""
Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK.
The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'.
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
cam = ADCpt(VimbaDetectorCam, "cam1:")
image = ADCpt(ImagePlugin, "image1:")
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0,
doc="Preview signal of the AlliedVision camera.",
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(
self,
*,
name: str,
prefix: str,
poll_rate: int = 5,
scan_info=None,
device_manager=None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_start_event = threading.Event()
if poll_rate > 10:
logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.")
poll_rate = 10
self._poll_rate = poll_rate
self._unique_array_id = 0
self._pv_timeout = 2.0
self.image: ImagePlugin
self._live_mode_lock = threading.RLock()
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
"""Start live mode."""
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
"""Stop live mode."""
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if not self._poll_start_event.is_set():
self._poll_start_event.set()
self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition
else:
logger.info(f"Live mode already started for {self.name}.")
return
if self._poll_start_event.is_set():
self._poll_start_event.clear()
self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition
else:
logger.info(f"Live mode already stopped for {self.name}.")
def on_connected(self):
"""Reset the unique array ID on connection."""
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
self._poll_thread.start()
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
while self._poll_start_event.wait():
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
array_size = self.image.array_size.get()
if array_size[0] == 0: # 2D image, not color image
array_size = array_size[1:]
# Geometry correction for the image
data = np.reshape(value, array_size)
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
def on_destroy(self):
"""Stop the polling thread on destruction."""
self._poll_thread_kill_event.set()
self._poll_start_event.set()
if self._poll_thread.is_alive():
self._poll_thread.join(timeout=2)

View File

@@ -156,7 +156,6 @@ class Camera:
camera_id (int): The ID of the camera device. camera_id (int): The ID of the camera device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera. m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera. bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
""" """
def __init__( def __init__(

View File

@@ -3,21 +3,19 @@
from __future__ import annotations from __future__ import annotations
import threading import threading
import time from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np import numpy as np
from ophyd import Component as Cpt, Signal, Kind
from bec_lib import messages from bec_lib import messages
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING: if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger logger = bec_logger.logger
@@ -45,8 +43,15 @@ class IDSCamera(PSIDeviceBase):
doc="Signal for the region of interest (ROI).", doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]}, async_update={"type": "add", "max_shape": [None]},
) )
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"] USER_ACCESS = ["start_live_mode", "stop_live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__( def __init__(
self, self,
@@ -83,15 +88,22 @@ class IDSCamera(PSIDeviceBase):
bits_per_pixel=bits_per_pixel, bits_per_pixel=bits_per_pixel,
connect=False, connect=False,
) )
self._live_mode = False
self._inputs = {"live_mode": live_mode} self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8) self._mask = np.zeros((1, 1), dtype=np.uint8)
self.image.num_rotation_90 = num_rotation_90 self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose self.image.transpose = transpose
self._force_monochrome = force_monochrome self._force_monochrome = force_monochrome
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
self.live_mode_enabled.put(bool(live_mode))
############## Live Mode Methods ############## ############## Live Mode Methods ##############
def start_live_mode(self) -> None:
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
@property @property
def mask(self) -> np.ndarray: def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera.""" """Return the current region of interest (ROI) for the camera."""
@@ -114,22 +126,15 @@ class IDSCamera(PSIDeviceBase):
) )
self._mask = value self._mask = value
@property def _on_live_mode_enabled_changed(self, *args, value, **kwargs):
def live_mode(self) -> bool: """Callback for when live mode is changed."""
"""Return whether the camera is in live mode.""" enabled = bool(value)
return self._live_mode if enabled and self.cam._connected is False: # pylint: disable=protected-access
self.cam.on_connect()
@live_mode.setter if enabled:
def live_mode(self, value: bool): self._start_live()
"""Set the live mode for the camera.""" else:
if value != self._live_mode: self._stop_live()
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int): def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera.""" """Set the rectangular region of interest (ROI) for the camera."""
@@ -196,7 +201,7 @@ class IDSCamera(PSIDeviceBase):
"""Connect to the camera.""" """Connect to the camera."""
self.cam.force_monochrome = self._force_monochrome self.cam.force_monochrome = self._force_monochrome
self.cam.on_connect() self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False) self.live_mode_enabled.put(bool(self._inputs.get("live_mode", False)))
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value) self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self): def on_destroy(self):
@@ -206,7 +211,7 @@ class IDSCamera(PSIDeviceBase):
def on_trigger(self): def on_trigger(self):
"""Handle the trigger event.""" """Handle the trigger event."""
if not self.live_mode: if not bool(self.live_mode_enabled.get()):
return return
image = self.image.get() image = self.image.get()
if image is not None: if image is not None:

View File

@@ -13,6 +13,7 @@ which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS varia
from __future__ import annotations from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
@@ -78,12 +79,38 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
""" """
_NUM_ANALOG_CHANNELS = 8 _NUM_ANALOG_CHANNELS = 8
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs): def __init__(
self,
signal_name: str,
channel: int,
parent: GalilRIO,
readback_timeout: float = None,
**kwargs,
):
super().__init__(signal_name=signal_name, parent=parent, **kwargs) super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel self._channel = channel
self._metadata["connected"] = False self._metadata["connected"] = False
self._readback_timeout = (
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
)
self._metadata["write_access"] = False self._metadata["write_access"] = False
self._last_readback = 0.0
def get(self):
current_time = time.monotonic()
if current_time - self._last_readback > self._readback_timeout:
old_value = self._readback
self._last_readback = current_time # _socket_get may rely on this value to be set.
self._readback = self._socket_get()
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_value,
value=self._readback,
timestamp=current_time,
)
return self._readback
def _socket_set(self, val): def _socket_set(self, val):
"""Read-only signal, so set method raises an error.""" """Read-only signal, so set method raises an error."""
@@ -136,6 +163,8 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
# Run subscriptions after all readbacks have been updated # Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update # on all channels except the one that triggered the update
# TODO for now skip running subscribers, this should be re-implemented
# once we properly handle subscriptions from bec running "read"
for walk in self.parent.walk_signals(): for walk in self.parent.walk_signals():
if walk.item.attr_name in updates: if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name] new_val, old_val = updates[walk.item.attr_name]
@@ -185,7 +214,7 @@ def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
an_channels[f"ch{i}"] = ( an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO, GalilRIOAnalogSignalRO,
f"ch{i}", f"ch{i}",
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."}, {"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
) )
return an_channels return an_channels
@@ -202,12 +231,7 @@ def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
di_out_channels[f"ch{i}"] = ( di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal, GalilRIODigitalOutSignal,
f"ch{i}", f"ch{i}",
{ {"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
"kind": Kind.config,
"notify_bec": True,
"channel": i,
"doc": f"Digital output channel {i}.",
},
) )
return di_out_channels return di_out_channels

View File

@@ -65,10 +65,8 @@ class RtLamniController(Controller):
"_position_sampling_single_read", "_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling", "_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer", "show_signal_strength_interferometer",
"show_interferometer_positions",
"show_analog_signals", "show_analog_signals",
"show_feedback_status", "show_feedback_status",
] ]
def __init__( def __init__(

View File

@@ -2,7 +2,7 @@ import requests
import threading import threading
import cv2 import cv2
import numpy as np import numpy as np
from ophyd import Device, Component as Cpt from ophyd import Device, Component as Cpt, Kind, Signal
from ophyd_devices import PreviewSignal from ophyd_devices import PreviewSignal
import traceback import traceback
@@ -13,6 +13,13 @@ logger = bec_logger.logger
class WebcamViewer(Device): class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"] USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False) preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None: def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs) super().__init__(name=name, **kwargs)
@@ -21,20 +28,54 @@ class WebcamViewer(Device):
self._update_thread = None self._update_thread = None
self._buffer = b"" self._buffer = b""
self._shutdown_event = threading.Event() self._shutdown_event = threading.Event()
self._live_mode_lock = threading.RLock()
self.preview.num_rotation_90 = num_rotation_90 self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose self.preview.transpose = transpose
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None: def start_live_mode(self) -> None:
if self._connection is not None: self.live_mode_enabled.put(True)
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True) def stop_live_mode(self) -> None:
self._update_thread.start() self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if self._update_thread is not None and self._update_thread.is_alive():
return
self._shutdown_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
return
if self._update_thread is None:
return
self._shutdown_event.set()
if self._connection is not None:
try:
self._connection.close()
except Exception: # pylint: disable=broad-except
pass
self._connection = None
self._update_thread.join(timeout=2)
if self._update_thread.is_alive():
logger.warning("Webcam live mode thread did not stop within timeout.")
return
self._update_thread = None
self._buffer = b""
self._shutdown_event.clear()
def _update_loop(self) -> None: def _update_loop(self) -> None:
while not self._shutdown_event.is_set(): while not self._shutdown_event.is_set():
try: try:
self._connection = requests.get(self.url, stream=True) self._connection = requests.get(self.url, stream=True, timeout=5)
for chunk in self._connection.iter_content(chunk_size=1024): for chunk in self._connection.iter_content(chunk_size=1024):
if self._shutdown_event.is_set():
break
self._buffer += chunk self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end end = self._buffer.find(b'\xff\xd9') # JPEG end
@@ -50,16 +91,3 @@ class WebcamViewer(Device):
except Exception as exc: except Exception as exc:
content = traceback.format_exc() content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}") logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -1,74 +1,46 @@
import numpy as np
from ophyd import Component as Cpt from ophyd import Component as Cpt, Signal, EpicsSignal
from ophyd import Device from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayAlignGUI(Device):
update_frame_acqdone = Cpt(Signal, value=0)
class OMNYXRayEpicsGUI(Device): update_frame_acq = Cpt(Signal, value=0)
enable_mv_x = Cpt(Signal, value=0)
save_frame = Cpt( enable_mv_y = Cpt(Signal, value=0)
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True send_message = Cpt(Signal, value=0)
) sample_name = Cpt(Signal, value=0)
update_frame_acqdone = Cpt( angle = Cpt(Signal, value=0)
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True pixel_size = Cpt(Signal, value=0)
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
submit = Cpt( submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
) )
step = Cpt( step = Cpt(Signal, value=0)
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True recbg = Cpt(Signal, value=0)
) mvx = Cpt(Signal, value=0)
xval_x_dynamic = { mvy = Cpt(Signal, value=0)
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)
fit_array = Cpt(Signal, value=np.zeros((3, 10)))
fit_params_x = Cpt(Signal, value=np.zeros((2, 3)))
fit_params_y = Cpt(Signal, value=np.zeros((2, 3)))
# Generate width_y_0 to width_y_10
for i in range(11):
locals()[f'width_y_{i}'] = Cpt(Signal, value=0)
# Generate width_x_0 to width_x_10
for i in range(11):
locals()[f'width_x_{i}'] = Cpt(Signal, value=0)
# Generate xval_x_0 to xval_x_10
for i in range(11):
locals()[f'xval_x_{i}'] = Cpt(Signal, value=0)
# Generate yval_y_0 to yval_y_10
for i in range(11):
locals()[f'yval_y_{i}'] = Cpt(Signal, value=0)
# Generate stage_pos_x_1 to stage_pos_x_5
for i in range(1, 6):
locals()[f'stage_pos_x_{i}'] = Cpt(Signal, value=0)

View File

@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "csaxs_bec" name = "csaxs_bec"
version = "0.0.0" version = "0.0.0"
description = "The cSAXS plugin repository for BEC" description = "The cSAXS plugin repository for BEC"
requires-python = ">=3.10" requires-python = ">=3.11"
classifiers = [ classifiers = [
"Development Status :: 3 - Alpha", "Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",

View File

@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
lamni = LamNI(client) lamni = LamNI(client)
align = XrayEyeAlign(client, lamni) align = XrayEyeAlign(client, lamni)
with mock.patch( with mock.patch(
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put" "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
) as epics_put_mock: ) as epics_put_mock:
align.save_frame() align.save_frame()
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1) epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
def test_update_frame(bec_client_mock): def test_update_frame(bec_client_mock):
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put" epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get" epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen" fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
client = bec_client_mock client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase( client.device_manager.devices.xeye = DeviceBase(
name="xeye", name="xeye",