2 Commits

Author SHA1 Message Date
2e428140e8 fix(samcam): move to psi device base
All checks were successful
CI for pxiii_bec / test (pull_request) Successful in 1m16s
CI for pxiii_bec / test (push) Successful in 29s
2025-11-14 10:52:21 +01:00
eb5a9c89ca feat: update repo with copier template for gitea migration 2025-09-11 17:02:57 +02:00
9 changed files with 201 additions and 96 deletions

View File

@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.0.0
_commit: v1.2.2
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: pxiii_bec

97
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,97 @@
name: CI for pxiii_bec
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
required: false
type: string
default: "main"
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
required: false
type: string
default: "main"
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
required: false
type: string
default: "main"
BEC_PLUGIN_REPO_BRANCH:
description: "Branch of the BEC Plugin Repository to install"
required: false
type: string
default: "main"
PYTHON_VERSION:
description: "Python version to use"
required: false
type: string
default: "3.11"
permissions:
pull-requests: write
jobs:
test:
runs-on: ubuntu-latest
env:
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
- name: Checkout BEC Core
uses: actions/checkout@v4
with:
repository: bec/bec
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
path: ./bec
- name: Checkout Ophyd Devices
uses: actions/checkout@v4
with:
repository: bec/ophyd_devices
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
path: ./ophyd_devices
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec/bec_widgets
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/pxiii_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./pxiii_bec
- name: Install dependencies
shell: bash
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Install Python dependencies
shell: bash
run: |
pip install uv
uv pip install --system -e ./ophyd_devices
uv pip install --system -e ./bec/bec_lib[dev]
uv pip install --system -e ./bec/bec_ipython_client
uv pip install --system -e ./bec/bec_server[dev]
uv pip install --system -e ./bec_widgets[dev,pyside6]
uv pip install --system -e ./pxiii_bec
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=./pxiii_bec --cov-config=./pxiii_bec/pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail ./pxiii_bec/tests/ || test $? -eq 5

View File

@@ -1,10 +1,14 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
is started. It can be used to add additional command line arguments.
"""
import os
from bec_lib.service_config import ServiceConfig
import pxiii_bec
def extend_command_line_args(parser):
"""
@@ -18,6 +22,11 @@ def extend_command_line_args(parser):
def get_config() -> ServiceConfig:
"""
Create and return the service configuration.
Create and return the ServiceConfig for the plugin repository
"""
return ServiceConfig(redis={"host": "x06da-bec-001", "port": 6379})
deployment_path = os.path.dirname(os.path.dirname(os.path.dirname(pxiii_bec.__file__)))
files = os.listdir(deployment_path)
if "bec_config.yaml" in files:
return ServiceConfig(config_path=os.path.join(deployment_path, "bec_config.yaml"))
else:
return ServiceConfig(redis={"host": "localhost", "port": 6379})

View File

@@ -28,49 +28,30 @@ class PlotUpdate(AutoUpdates):
Args:
msg (ScanStatusMessage): The scan status message.
"""
if msg.scan_name == "line_scan" and msg.scan_report_devices:
return self.simple_line_scan(msg)
if msg.scan_name == "grid_scan" and msg.scan_report_devices:
return self.simple_grid_scan(msg)
dev_x = msg.scan_report_devices[0]
if "kwargs" in msg.request_inputs:
dev_y = msg.request_inputs["kwargs"].get("plot", None)
if dev_y is not None:
# Set the dock to the waveform widget
wf = self.set_dock_to_widget("Waveform")
# if "kwargs" in msg.request_inputs:
# dev_plt = msg.request_inputs["kwargs"].get("plot", None)
# if dev_plt is not None:
# # Handle depending on scan dimension
# if len(msg.scan_report_devices) == 1:
# dev_x = msg.scan_report_devices[0]
# # Set the dock to the waveform widget
# wf = self.set_dock_to_widget("Waveform")
# # Clear the waveform widget and plot the data
# wf.clear_all()
# wf.plot(
# x_name=dev_x,
# y_name=dev_plt,
# label=f"Scan {msg.info.scan_number} - {dev_plt}",
# title=f"Scan {msg.info.scan_number}",
# x_label=dev_x,
# y_label=dev_plt,
# )
# if len(msg.scan_report_devices) == 2:
# dev_x = msg.scan_report_devices[0]
# dev_y = msg.scan_report_devices[1]
# # Set the dock to the waveform widget
# wf = self.set_dock_to_widget("Waveform")
# # Clear the waveform widget and plot the data
# wf.clear_all()
# wf.plot(
# x_name=dev_x,
# y_name=dev_y,
# z_name=dev_plt,
# label=f"Scan {msg.info.scan_number} - {dev_plt}",
# title=f"Scan {msg.info.scan_number} - {dev_plt}",
# x_label=dev_x,
# y_label=dev_y,
# z_label=dev_plt,
# )
# elif msg.scan_name == "line_scan" and msg.scan_report_devices:
# return self.simple_line_scan(msg)
# elif msg.scan_name == "grid_scan" and msg.scan_report_devices:
# return self.simple_grid_scan(msg)
# elif msg.scan_report_devices:
# return self.best_effort(msg)
# Clear the waveform widget and plot the data
wf.clear_all()
wf.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {msg.info.scan_number} - {dev_y}",
title=f"Scan {msg.info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
elif msg.scan_report_devices:
return self.best_effort(msg)
return None
def on_scan_closed(self, msg: ScanStatusMessage) -> None:

View File

@@ -691,26 +691,6 @@ abr:
readoutPriority: monitored
readOnly: false
softwareTrigger: false
coll_x:
description: Collimator X
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-COL:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
coll_y:
description: Collimator Y
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-COL:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shx:
description: SmarGon X axis
deviceClass: pxiii_bec.devices.SmarGonAxisB

View File

@@ -97,10 +97,10 @@ class AerotechAbrStage(PSIDeviceBase, Device):
# )
# Status flags for all axes
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", auto_monitor=True, kind=Kind.normal)
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", auto_monitor=True, kind=Kind.normal)
gmy_done = Component(EpicsSignalRO, "-DF1:GMY-DONE", auto_monitor=True, kind=Kind.normal)
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", auto_monitor=True, kind=Kind.normal)
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", kind=Kind.normal)
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", kind=Kind.normal)
gmy_done = Component(EpicsSignalRO, "-DF1:GMY-DONE", kind=Kind.normal)
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal)
# For some reason the task interface is called PSO...
scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
@@ -128,7 +128,7 @@ class AerotechAbrStage(PSIDeviceBase, Device):
task2 = Component(EpicsSignalRO, "-AERO:TSK2-DONE", auto_monitor=True)
task3 = Component(EpicsSignalRO, "-AERO:TSK3-DONE", auto_monitor=True)
task4 = Component(EpicsSignalRO, "-AERO:TSK4-DONE", auto_monitor=True)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", auto_monitor=True, kind=Kind.config)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", kind=Kind.config)
def __init__(
self,

View File

@@ -11,42 +11,74 @@ Created on Thu Jan 30 2025
@author: mohacsi_i
"""
from ophyd import ADComponent
from ophyd_devices.devices.areadetector.cam import GenICam
from __future__ import annotations
# from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
CustomDetectorMixin,
)
from typing import TYPE_CHECKING
from bec_lib import bec_logger
from ophyd import ADComponent
from ophyd_devices.devices.areadetector.cam import GenICam
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from ophyd import DeviceStatus, StatusBase
logger = bec_logger.logger
class SamCamSetup(CustomDetectorMixin):
"""Simple camera mixin class, the SAMCAM is usually streaming"""
def on_stage(self):
"""Just make sure it's running continously"""
self.parent.cam.acquire.put(1, wait=True)
def on_unstage(self):
"""Should run continously"""
def on_stop(self):
"""Should run continously"""
class SamCamDetector(PSIDetectorBase):
class SamCamDetector(PSIDeviceBase):
"""Sample camera device
The SAMCAM continously streams images to the GUI and sample alignment
scripts via ZMQ.
"""
custom_prepare_cls = SamCamSetup
cam = ADComponent(GenICam, "cam1:")
# image = ADComponent(ImagePlugin_V35, "image1:")
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
self.parent.cam.acquire.put(1, wait=True)
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""

View File

@@ -0,0 +1,6 @@
# Macros
This directory is intended to store macros which will be loaded automatically when starting BEC.
Macros are small functions to make repetitive tasks easier. Functions defined in python files in this directory will be accessible from the BEC console.
Please do not put any code outside of function definitions here. If you wish for code to be automatically run when starting BEC, see the startup script at pxiii_bec/bec_ipython_client/startup/post_startup.py
For a guide on writing macros, please see: https://bec.readthedocs.io/en/latest/user/command_line_interface.html#how-to-write-a-macro

View File