Compare commits

...

6 Commits

34 changed files with 940 additions and 584 deletions

9
.copier-answers.yml Normal file
View File

@@ -0,0 +1,9 @@
# Do not edit this file!
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.0.0
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: csaxs_bec
widget_plugins_input: []

View File

@@ -1,6 +1,7 @@
BSD 3-Clause License
Copyright (c) 2024, Paul Scherrer Institute
Copyright (c) 2025, Paul Scherrer Institute
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
@@ -25,4 +26,4 @@ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

1
bin/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
# Add anything you don't want to check in to git, e.g. very large files

View File

@@ -13,7 +13,6 @@ logger = bec_logger.logger
class PilatusConverter:
def __init__(self, host: str, port: int) -> None:
self._connector = RedisConnector(f"{host}:{port}")
self._producer = self._connector.producer()
def start(self) -> None:
"""start the consumer"""
@@ -57,10 +56,9 @@ class PilatusConverter:
"""
Start the consumer.
"""
file_consumer = self._connector.consumer(
self._connector.register(
MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self
)
file_consumer.start()
if __name__ == "__main__":

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
if TYPE_CHECKING: # pragma: no cover
from bec_lib.messages import ScanStatusMessage
class cSAXSUpdate(AutoUpdates):
#######################################################################
################# GUI Callbacks #######################################
#######################################################################
def on_start(self) -> None:
"""
Procedure to run when the auto updates are enabled.
"""
self.start_default_dock()
def on_stop(self) -> None:
"""
Procedure to run when the auto updates are disabled.
"""
def on_scan_open(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan starts.
Args:
msg (ScanStatusMessage): The scan status message.
"""
if msg.scan_name == "line_scan" and msg.scan_report_devices:
return self.simple_line_scan(msg)
if msg.scan_name == "grid_scan" and msg.scan_report_devices:
return self.simple_grid_scan(msg)
if msg.scan_report_devices:
return self.best_effort(msg)
return None
def on_scan_closed(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan ends.
Args:
msg (ScanStatusMessage): The scan status message.
"""
def on_scan_abort(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan is aborted.
Args:
msg (ScanStatusMessage): The scan status message.
"""
class cSAXSUpdateAlignment(AutoUpdates): ...
class cSAXSUpdateScan(AutoUpdates): ...

View File

@@ -0,0 +1,75 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
logger = bec_logger.logger
# pylint: skip-file
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
}
class OmnyAlignment(RPCBase):
@property
@rpc_call
def enable_live_view(self):
"""
None
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
None
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""

View File

@@ -0,0 +1,139 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam200")
return
image.disconnect_monitor("cam200")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@
{'files': ['omny_alignment.py']}

View File

@@ -0,0 +1,125 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam200</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -0,0 +1,54 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1,11 +0,0 @@
import os
def setup_epics_ca():
#os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
#os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

View File

@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
# os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
# os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

@@ -3,12 +3,9 @@ import time
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd import DeviceStatus, Kind, StatusBase
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.bec_signals import PreviewSignal
try:
from pyueye import ueye
@@ -17,212 +14,10 @@ except ImportError:
ueye = None
class IDSCustomPrepare(CustomDetectorMixin):
USER_ACCESS = ["pyueye"]
pyueye = ueye
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
super().__init__(*_args, parent=parent, **_kwargs)
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.thread_event = None
def on_connection_established(self):
self.hCam = self.ueye.HIDS(
self.parent.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.sInfo = self.ueye.SENSORINFO()
self.cInfo = self.ueye.CAMINFO()
self.pcImageMemory = self.ueye.c_mem_p()
self.MemID = self.ueye.int()
self.rectAOI = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.nBitsPerPixel = self.ueye.INT(
self.parent.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.channels = (
self.parent.channels
) # 3: channels for color mode(RGB); take 1 channel for monochrome
self.m_nColorMode = self.ueye.INT(
self.parent.m_n_colormode
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.nBitsPerPixel / 8)
# Starts the driver and establishes the connection to the camera
nRet = self.ueye.is_InitCamera(self.hCam, None)
if nRet != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
nRet = self.ueye.is_GetCameraInfo(self.hCam, self.cInfo)
if nRet != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
nRet = self.ueye.is_GetSensorInfo(self.hCam, self.sInfo)
if nRet != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
nRet = self.ueye.is_ResetToDefault(self.hCam)
if nRet != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
nRet = self.ueye.is_SetDisplayMode(self.hCam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.hCam, self.nBitsPerPixel, self.m_nColorMode)
bytes_per_pixel = int(self.nBitsPerPixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_nColorMode: \t\t", self.m_nColorMode)
print("\tnBitsPerPixel: \t\t", self.nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
nBitsPerPixel = ueye.INT(32)
bytes_per_pixel = int(self.nBitsPerPixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_nColorMode = self.ueye.IS_CM_MONO8
nBitsPerPixel = self.ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_nColorMode = self.ueye.IS_CM_MONO8
nBitsPerPixel = self.ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
nRet = self.ueye.is_AOI(
self.hCam, ueye.IS_AOI_IMAGE_GET_AOI, self.rectAOI, self.ueye.sizeof(self.rectAOI)
)
if nRet != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
self.width = self.rectAOI.s32Width
self.height = self.rectAOI.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.sInfo.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.cInfo.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
# ---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
nRet = self.ueye.is_AllocImageMem(
self.hCam, self.width, self.height, self.nBitsPerPixel, self.pcImageMemory, self.MemID
)
if nRet != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
nRet = self.ueye.is_SetImageMem(self.hCam, self.pcImageMemory, self.MemID)
if nRet != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
nRet = self.ueye.is_SetColorMode(self.hCam, self.m_nColorMode)
# Activates the camera's live video mode (free run mode)
nRet = self.ueye.is_CaptureVideo(self.hCam, self.ueye.IS_DONT_WAIT)
if nRet != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
nRet = self.ueye.is_InquireImageMem(
self.hCam,
self.pcImageMemory,
self.MemID,
self.width,
self.height,
self.nBitsPerPixel,
self.pitch,
)
if nRet != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
startmeasureframerate = True
Gain = False
# Start live mode of camera immediately
self.parent.start_live_mode()
def _start_data_thread(self):
self.thread_event = threading.Event()
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = ueye.get_data(
self.pcImageMemory,
self.width,
self.height,
self.nBitsPerPixel,
self.pitch,
copy=False,
)
# bytes_per_pixel = int(nBitsPerPixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.parent.image_data.put(frame)
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=frame)
time.sleep(0.1)
def on_trigger(self):
pass
# self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=self.parent.image_data.get())
class IDSCamera(PSIDetectorBase):
class IDSCamera(PSIDeviceBase):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
custom_prepare_cls = IDSCustomPrepare
image_data = Cpt(SetableSignal, value=np.empty((100, 100)), kind=Kind.omitted)
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
image_data = Cpt(PreviewSignal, value=np.empty((100, 100)), kind=Kind.omitted)
def __init__(
self,
@@ -234,19 +29,219 @@ class IDSCamera(PSIDetectorBase):
channels: int,
m_n_colormode: int,
kind=None,
parent=None,
device_manager=None,
**kwargs,
):
super().__init__(
prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
)
self.camera_ID = camera_ID
self.bits_per_pixel = bits_per_pixel
self.bytes_per_pixel = None
self.channels = channels
self.m_n_colormode = m_n_colormode
#TODO fix connected and wait_for_connection
self.custom_prepare.on_connection_established()
self._m_n_colormode_input = m_n_colormode
self.m_n_colormode = None
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.c_info = None
self.pc_image_memory = None
self.mem_id = None
self.rect_aoi = None
self.pitch = None
self.n_bits_per_pixel = None
self.width = None
self.height = None
self.thread_event = threading.Event()
self.data_thread = None
def start_backend(self):
if self.ueye is None:
raise ImportError("The pyueye library is not installed.")
self.h_cam = self.ueye.HIDS(
self.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.s_info = self.ueye.SENSORINFO()
self.c_info = self.ueye.CAMINFO()
self.pc_image_memory = self.ueye.c_mem_p()
self.mem_id = self.ueye.int()
self.rect_aoi = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.n_bits_per_pixel = self.ueye.INT(
self.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.m_n_colormode = self.ueye.INT(
self._m_n_colormode_input
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
# Starts the driver and establishes the connection to the camera
ret = self.ueye.is_InitCamera(self.h_cam, None)
if ret != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
ret = self.ueye.is_ResetToDefault(self.h_cam)
if ret != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_n_colormode: \t\t", self.m_n_colormode)
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
n_bits_per_pixel = self.ueye.INT(32)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
ret = self.ueye.is_AOI(
self.h_cam,
self.ueye.IS_AOI_IMAGE_GET_AOI,
self.rect_aoi,
self.ueye.sizeof(self.rect_aoi),
)
if ret != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
self.width = self.rect_aoi.s32Width
self.height = self.rect_aoi.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
# ---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
ret = self.ueye.is_AllocImageMem(
self.h_cam,
self.width,
self.height,
self.n_bits_per_pixel,
self.pc_image_memory,
self.mem_id,
)
if ret != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
if ret != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
# Activates the camera's live video mode (free run mode)
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
if ret != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
ret = self.ueye.is_InquireImageMem(
self.h_cam,
self.pc_image_memory,
self.mem_id,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
)
if ret != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
# startmeasureframerate = True
# Gain = False
# Start live mode of camera immediately
self.start_live_mode()
def _start_data_thread(self):
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
if self.ueye is None:
print("pyueye library not available.")
return
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = self.ueye.get_data(
self.pc_image_memory,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
copy=False,
)
# bytes_per_pixel = int(n_bits_per_pixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.image_data.put(frame)
time.sleep(0.1)
def wait_for_connection(self, all_signals=False, timeout=10):
if ueye is None:
@@ -254,226 +249,64 @@ class IDSCamera(PSIDetectorBase):
"The pyueye library is not installed or doesn't provide the necessary c libs"
)
super().wait_for_connection(all_signals, timeout)
#self.custom_prepare.on_connection_established()
def destroy(self):
"""Extend Ophyds destroy function to kill the data thread"""
self.stop_live_mode()
super().destroy()
def start_live_mode(self):
if self.custom_prepare.data_thread is not None:
if self.data_thread is not None:
self.stop_live_mode()
self.custom_prepare._start_data_thread()
self._start_data_thread()
def stop_live_mode(self):
"""Stopping the camera live mode."""
if self.custom_prepare.thread_event is not None:
self.custom_prepare.thread_event.set()
if self.custom_prepare.data_thread is not None:
self.custom_prepare.data_thread.join()
self.custom_prepare.thread_event = None
self.custom_prepare.data_thread = None
self.thread_event.set()
if self.data_thread is not None:
self.data_thread.join()
self.thread_event.clear()
self.data_thread = None
########################################
# Beamline Specific Implementations #
########################################
"""from pyueye import ueye
import numpy as np
import cv2
import sys
import time
def on_init(self) -> None:
"""
Called when the device is initialized.
#---------------------------------------------------------------------------------------------------------------------------------------
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
#Variables
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
sInfo = ueye.SENSORINFO()
cInfo = ueye.CAMINFO()
pcImageMemory = ueye.c_mem_p()
MemID = ueye.int()
rectAOI = ueye.IS_RECT()
pitch = ueye.INT()
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
bytes_per_pixel = int(nBitsPerPixel / 8)
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.start_backend()
ids_cam
...
deviceConfig:
camera_ID: 202
bits_per_pixel: 24
channels: 3
m_n_colormode: 1
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
#---------------------------------------------------------------------------------------------------------------------------------------
print("START")
print()
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
# Starts the driver and establishes the connection to the camera
nRet = ueye.is_InitCamera(hCam, None)
if nRet != ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
nRet = ueye.is_GetCameraInfo(hCam, cInfo)
if nRet != ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
# You can query additional information about the sensor type used in the camera
nRet = ueye.is_GetSensorInfo(hCam, sInfo)
if nRet != ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
nRet = ueye.is_ResetToDefault( hCam)
if nRet != ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
# Set display mode to DIB
nRet = ueye.is_SetDisplayMode(hCam, ueye.IS_SET_DM_DIB)
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
# Set the right color mode
if int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_BAYER:
# setup the color depth to the current windows setting
ueye.is_GetColorDepth(hCam, nBitsPerPixel, m_nColorMode)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_BAYER: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_CBYCRY:
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
nBitsPerPixel = ueye.INT(32)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_CBYCRY: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_MONOCHROME:
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_MONO8
nBitsPerPixel = ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_MONOCHROME: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_nColorMode = ueye.IS_CM_MONO8
nBitsPerPixel = ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
nRet = ueye.is_AOI(hCam, ueye.IS_AOI_IMAGE_GET_AOI, rectAOI, ueye.sizeof(rectAOI))
if nRet != ueye.IS_SUCCESS:
print("is_AOI ERROR")
width = rectAOI.s32Width
height = rectAOI.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", sInfo.strSensorName.decode('utf-8'))
print("Camera serial no.:\t", cInfo.SerNo.decode('utf-8'))
print("Maximum image width:\t", width)
print("Maximum image height:\t", height)
print()
#---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
nRet = ueye.is_AllocImageMem(hCam, width, height, nBitsPerPixel, pcImageMemory, MemID)
if nRet != ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
nRet = ueye.is_SetImageMem(hCam, pcImageMemory, MemID)
if nRet != ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
nRet = ueye.is_SetColorMode(hCam, m_nColorMode)
# Activates the camera's live video mode (free run mode)
nRet = ueye.is_CaptureVideo(hCam, ueye.IS_DONT_WAIT)
if nRet != ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
nRet = ueye.is_InquireImageMem(hCam, pcImageMemory, MemID, width, height, nBitsPerPixel, pitch)
if nRet != ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
startmeasureframerate=True
Gain = False
#---------------------------------------------------------------------------------------------------------------------------------------
# Continuous image display
while(nRet == ueye.IS_SUCCESS):
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = ueye.get_data(pcImageMemory, width, height, nBitsPerPixel, pitch, copy=False)
# bytes_per_pixel = int(nBitsPerPixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array,(height.value, width.value, bytes_per_pixel))
# ...resize the image by a half
frame = cv2.resize(frame,(0,0),fx=0.5, fy=0.5)
#---------------------------------------------------------------------------------------------------------------------------------------
#Include image data processing here
#---------------------------------------------------------------------------------------------------------------------------------------
#...and finally display it
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
if startmeasureframerate:
starttime = time.time()
startmeasureframerate=False
framenumber=0
if time.time() > starttime+5:
print(f"Caught {framenumber/5} frames per second")
startmeasureframerate=True
Gain = ~Gain
if Gain:
nRet = ueye.is_SetGainBoost(hCam, 1)
else:
nRet = ueye.is_SetGainBoost(hCam, 0)
print(f"Gain setting status {nRet}")
#...and finally display it
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
framenumber+=1
time.sleep(0.1)
# Press q if you want to end the loop
if (cv2.waitKey(1) & 0xFF) == ord('q'):
break
#---------------------------------------------------------------------------------------------------------------------------------------
# Releases an image memory that was allocated using is_AllocImageMem() and removes it from the driver management
ueye.is_FreeImageMem(hCam, pcImageMemory, MemID)
# Disables the hCam camera handle and releases the data structures and memory areas taken up by the uEye camera
ueye.is_ExitCamera(hCam)
# Destroys the OpenCv windows
cv2.destroyAllWindows()
print()
print("END")
"""
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
self.stop_live_mode()

View File

@@ -1,83 +0,0 @@
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Kind, Signal
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.bec_device_base import BECDeviceBase
logger = bec_logger.logger
# Readout precision for Setable/ReadOnlySignal signals
PRECISION = 3
class ReadOnlySignal(Signal):
"""Setable signal for simulated devices.
The signal will store the value in sim_state of the SimulatedData class of the parent device.
It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal,
this signal can be written to.
The setable signal inherits from the Signal class of ophyd, thus the class attribute needs to be
initiated as a Component (class from ophyd).
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
Parameters
----------
name (string) : Name of the signal
parent (object) : Parent object of the signal, default none.
value (any) : Initial value of the signal, default 0.
kind (int) : Kind of the signal, default Kind.normal.
precision (float) : Precision of the signal, default PRECISION.
"""
def __init__(
self,
name: str,
*args,
fcn: callable,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update(connected=True, write_access=False)
self._value = None
self.precision = precision
self.fcn = fcn
# pylint: disable=arguments-differ
def get(self):
"""Get the current position of the simulated device.
Core function for signal.
"""
self._value = self.fcn()
return self._value
# pylint: disable=arguments-differ
def put(self, value):
"""Put the value to the simulated device.
Core function for signal.
"""
self._update_sim_state(value)
self._value = value
def describe(self):
"""Describe the readback signal.
Core function for signal.
"""
res = super().describe()
if self.precision is not None:
res[self.name]["precision"] = self.precision
return res
@property
def timestamp(self):
"""Timestamp of the readback value"""
return self._get_timestamp()

View File

@@ -116,13 +116,12 @@ class FlomniFermatScan(SyncFlyScanBase):
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
producer = self.device_manager.producer
msg = producer.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
msg = self.connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
producer.set(
self.connector.set(
MessageEndpoints.global_vars("reverse_flomni_trajectory"),
messages.VariableMessage(value=(not val)),
)
@@ -280,7 +279,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")

View File

@@ -0,0 +1,12 @@
# from .metadata_schema_template import ExampleSchema
METADATA_SCHEMA_REGISTRY = {
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
# "example_scan": ExampleSchema
}
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None

View File

@@ -0,0 +1,34 @@
# # By inheriting from BasicScanMetadata you can define a schema by which metadata
# # supplied to a scan must be validated.
# # This schema is a Pydantic model: https://docs.pydantic.dev/latest/concepts/models/
# # but by default it will still allow you to add any arbitrary information to it.
# # That is to say, when you run a scan with which such a model has been associated in the
# # metadata_schema_registry, you can supply any python dictionary with strings as keys
# # and built-in python types (strings, integers, floats) as values, and these will be
# # added to the experiment metadata, but it *must* contain the keys and values of the
# # types defined in the schema class.
# #
# #
# # For example, say that you would like to enforce recording information about sample
# # pretreatment, you could define the following:
# #
#
# from bec_lib.metadata_schema import BasicScanMetadata
#
#
# class ExampleSchema(BasicScanMetadata):
# treatment_description: str
# treatment_temperature_k: int
#
#
# # If this was used according to the example in metadata_schema_registry.py,
# # then when calling the scan, the user would need to write something like:
# >>> scans.example_scan(
# >>> motor,
# >>> 1,
# >>> 2,
# >>> 3,
# >>> metadata={"treatment_description": "oven overnight", "treatment_temperature_k": 575},
# >>> )
#
# # And the additional metadata would be saved in the HDF5 file created for the scan.

View File

@@ -116,13 +116,12 @@ class OMNYFermatScan(SyncFlyScanBase):
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
producer = self.device_manager.producer
msg = producer.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
msg = self.connector.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
producer.set(
self.connector.set(
MessageEndpoints.global_vars("reverse_omny_trajectory"),
messages.VariableMessage(value=(not val)),
)
@@ -265,7 +264,7 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")

View File

View File

@@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "csaxs_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
description = "The cSAXS plugin repository for BEC"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
@@ -28,19 +28,21 @@ dependencies = [
[project.optional-dependencies]
dev = [
"black",
"copier",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"pytest-redis",
"ophyd_devices",
"bec_server",
]
[project.entry-points."bec"]
plugin_bec = "csaxs_bec"
[project.entry-points."bec.deployment.device_server"]
plugin_ds_startup = "csaxs_bec.deployment.device_server.startup:run"
plugin_ds_startup = "csaxs_bec.deployments.device_server.startup:run"
[project.entry-points."bec.file_writer"]
plugin_file_writer = "csaxs_bec.file_writer"
@@ -48,12 +50,18 @@ plugin_file_writer = "csaxs_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "csaxs_bec.scans"
[project.entry-points."bec.scans.metadata_schema"]
plugin_metadata_schema = "csaxs_bec.scans.metadata_schema"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "csaxs_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "csaxs_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets"]
plugin_widgets = "csaxs_bec.bec_widgets"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "csaxs_bec.bec_widgets.auto_updates"
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "csaxs_bec.bec_widgets.widgets"
[tool.hatch.build.targets.wheel]
include = ["*"]

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -28,7 +28,10 @@ class RTMock(DeviceBase):
def test_save_frame(bec_client_mock):
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
with mock.patch(
@@ -43,7 +46,10 @@ def test_update_frame(bec_client_mock):
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
with mock.patch(epics_put) as epics_put_mock:
@@ -64,10 +70,16 @@ def test_update_frame(bec_client_mock):
def test_disable_rt_feedback(bec_client_mock):
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
client.device_manager.devices.rtx = RTMock(name="rtx", config={})
client.device_manager.devices.rtx = RTMock(
name="rtx",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
with mock.patch.object(
align.device_manager.devices.rtx.controller, "feedback_disable"
) as fdb_disable:
@@ -77,10 +89,16 @@ def test_disable_rt_feedback(bec_client_mock):
def test_enable_rt_feedback(bec_client_mock):
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
client.device_manager.devices.rtx = RTMock(name="rtx", config={})
client.device_manager.devices.rtx = RTMock(
name="rtx",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
with mock.patch.object(
align.device_manager.devices.rtx.controller, "feedback_enable_with_reset"
) as fdb_enable:
@@ -96,10 +114,16 @@ def test_tomo_rotate(bec_client_mock):
client._update_namespace_callback = mock.MagicMock()
client.callbacks = mock.MagicMock()
client.load_high_level_interface("bec_hli")
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
client.device_manager.devices.lsamrot = RTMock(name="lsamrot", config={})
client.device_manager.devices.lsamrot = RTMock(
name="lsamrot",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
with mock.patch.object(builtins, "umv") as umv:
align.tomo_rotate(5)
umv.assert_called_once_with(client.device_manager.devices.lsamrot, 5)

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).