Compare commits

..

11 Commits

Author SHA1 Message Date
gac-x12sa
d7290e3942 wip work at beamline 2025-04-23 10:51:14 +02:00
gac-x12sa
730926f5b3 wip jfj integration 2025-03-25 16:30:01 +01:00
gac-x12sa
18215a05b5 wip jfj, implemented first logic for computing triggers and pulse widths 2025-03-24 17:39:26 +01:00
gac-x12sa
5cd93fc5aa wip moving readout from class attribute to constant 2025-03-24 17:38:01 +01:00
gac-x12sa
2b4a13ebc2 wip eiger, improve stage procedure for BEC core scans 2025-03-24 08:23:18 +01:00
gac-x12sa
ee8fa8b962 wip fixing jfj client, improve start procedure 2025-03-24 08:23:18 +01:00
gac-x12sa
b281e458f9 wip small fixes of ddg csaxs for readout times 2025-03-24 08:23:17 +01:00
gac-x12sa
48bd7f73a8 refactor: delay generator ready for step scanning 2025-03-21 15:12:35 +01:00
gac-x12sa
b806487c54 wip: draft from working at the beamline 2025-02-26 15:45:22 +01:00
53dca4dc6f wip, fix enums for pytest 3.10 2025-02-26 11:28:04 +01:00
ccf8bb8474 refactor: client refactoring, adding tests for jfj_client models 2025-02-26 11:22:14 +01:00
40 changed files with 2002 additions and 1254 deletions

View File

@@ -1,9 +0,0 @@
# Do not edit this file!
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.0.0
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: csaxs_bec
widget_plugins_input: []

View File

@@ -1,7 +1,6 @@
BSD 3-Clause License
Copyright (c) 2025, Paul Scherrer Institute
Copyright (c) 2024, Paul Scherrer Institute
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
@@ -26,4 +25,4 @@ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

1
bin/.gitignore vendored
View File

@@ -1 +0,0 @@
# Add anything you don't want to check in to git, e.g. very large files

View File

@@ -13,6 +13,7 @@ logger = bec_logger.logger
class PilatusConverter:
def __init__(self, host: str, port: int) -> None:
self._connector = RedisConnector(f"{host}:{port}")
self._producer = self._connector.producer()
def start(self) -> None:
"""start the consumer"""
@@ -56,9 +57,10 @@ class PilatusConverter:
"""
Start the consumer.
"""
self._connector.register(
file_consumer = self._connector.consumer(
MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self
)
file_consumer.start()
if __name__ == "__main__":

View File

@@ -1,63 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
if TYPE_CHECKING: # pragma: no cover
from bec_lib.messages import ScanStatusMessage
class cSAXSUpdate(AutoUpdates):
#######################################################################
################# GUI Callbacks #######################################
#######################################################################
def on_start(self) -> None:
"""
Procedure to run when the auto updates are enabled.
"""
self.start_default_dock()
def on_stop(self) -> None:
"""
Procedure to run when the auto updates are disabled.
"""
def on_scan_open(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan starts.
Args:
msg (ScanStatusMessage): The scan status message.
"""
if msg.scan_name == "line_scan" and msg.scan_report_devices:
return self.simple_line_scan(msg)
if msg.scan_name == "grid_scan" and msg.scan_report_devices:
return self.simple_grid_scan(msg)
if msg.scan_report_devices:
return self.best_effort(msg)
return None
def on_scan_closed(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan ends.
Args:
msg (ScanStatusMessage): The scan status message.
"""
def on_scan_abort(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan is aborted.
Args:
msg (ScanStatusMessage): The scan status message.
"""
class cSAXSUpdateAlignment(AutoUpdates): ...
class cSAXSUpdateScan(AutoUpdates): ...

View File

@@ -1,75 +0,0 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
logger = bec_logger.logger
# pylint: skip-file
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
}
class OmnyAlignment(RPCBase):
@property
@rpc_call
def enable_live_view(self):
"""
None
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
None
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""

View File

@@ -1,139 +0,0 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam200")
return
image.disconnect_monitor("cam200")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['omny_alignment.py']}

View File

@@ -1,125 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam200</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
#os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
#os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

@@ -1,11 +0,0 @@
import os
def setup_epics_ca():
# os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
# os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

@@ -0,0 +1,64 @@
eiger9m:
description: Eiger9m HPC area detector 9M with JungfrauJoch backend
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_jfj.Eiger9MCSAXS
deviceConfig:
host: "http://sls-jfjoch-001"
port: 8080
deviceTags:
- cSAXS
- eiger9m
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: false
ddg_jfj:
description: DelayGenerator for triggering all detectors
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
deviceConfig:
prefix: 'X12SA-CPCL-DDG3:'
# ddg_config:
# delay_burst: 40.e-3
# delta_width: 0
# additional_triggers: 0
# polarity:
# - 1 # T0 -> DDG MCS
# - 0 # eiger
# - 1 # falcon
# - 1
# - 1
# amplitude: 4.5
# offset: 0
# thres_trig_level: 2.5
# set_high_on_exposure: False
# set_high_on_stage: False
deviceTags:
- cSAXS
- ddg_detectors
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: True
# Two test devices from the simulation
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
limits:
- -50
- 50
tolerance: 0.01
update_frequency: 400
deviceTags:
- user motors
enabled: true
readOnly: false
bpm4i:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false

File diff suppressed because it is too large Load Diff

View File

@@ -3,9 +3,12 @@ import time
import numpy as np
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, StatusBase
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.bec_signals import PreviewSignal
from ophyd import Device, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_signals import SetableSignal
try:
from pyueye import ueye
@@ -14,10 +17,212 @@ except ImportError:
ueye = None
class IDSCamera(PSIDeviceBase):
class IDSCustomPrepare(CustomDetectorMixin):
USER_ACCESS = ["pyueye"]
pyueye = ueye
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
super().__init__(*_args, parent=parent, **_kwargs)
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.thread_event = None
def on_connection_established(self):
self.hCam = self.ueye.HIDS(
self.parent.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.sInfo = self.ueye.SENSORINFO()
self.cInfo = self.ueye.CAMINFO()
self.pcImageMemory = self.ueye.c_mem_p()
self.MemID = self.ueye.int()
self.rectAOI = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.nBitsPerPixel = self.ueye.INT(
self.parent.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.channels = (
self.parent.channels
) # 3: channels for color mode(RGB); take 1 channel for monochrome
self.m_nColorMode = self.ueye.INT(
self.parent.m_n_colormode
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.nBitsPerPixel / 8)
# Starts the driver and establishes the connection to the camera
nRet = self.ueye.is_InitCamera(self.hCam, None)
if nRet != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
nRet = self.ueye.is_GetCameraInfo(self.hCam, self.cInfo)
if nRet != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
nRet = self.ueye.is_GetSensorInfo(self.hCam, self.sInfo)
if nRet != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
nRet = self.ueye.is_ResetToDefault(self.hCam)
if nRet != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
nRet = self.ueye.is_SetDisplayMode(self.hCam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.hCam, self.nBitsPerPixel, self.m_nColorMode)
bytes_per_pixel = int(self.nBitsPerPixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_nColorMode: \t\t", self.m_nColorMode)
print("\tnBitsPerPixel: \t\t", self.nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
nBitsPerPixel = ueye.INT(32)
bytes_per_pixel = int(self.nBitsPerPixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_nColorMode = self.ueye.IS_CM_MONO8
nBitsPerPixel = self.ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_nColorMode = self.ueye.IS_CM_MONO8
nBitsPerPixel = self.ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
nRet = self.ueye.is_AOI(
self.hCam, ueye.IS_AOI_IMAGE_GET_AOI, self.rectAOI, self.ueye.sizeof(self.rectAOI)
)
if nRet != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
self.width = self.rectAOI.s32Width
self.height = self.rectAOI.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.sInfo.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.cInfo.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
# ---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
nRet = self.ueye.is_AllocImageMem(
self.hCam, self.width, self.height, self.nBitsPerPixel, self.pcImageMemory, self.MemID
)
if nRet != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
nRet = self.ueye.is_SetImageMem(self.hCam, self.pcImageMemory, self.MemID)
if nRet != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
nRet = self.ueye.is_SetColorMode(self.hCam, self.m_nColorMode)
# Activates the camera's live video mode (free run mode)
nRet = self.ueye.is_CaptureVideo(self.hCam, self.ueye.IS_DONT_WAIT)
if nRet != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
nRet = self.ueye.is_InquireImageMem(
self.hCam,
self.pcImageMemory,
self.MemID,
self.width,
self.height,
self.nBitsPerPixel,
self.pitch,
)
if nRet != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
startmeasureframerate = True
Gain = False
# Start live mode of camera immediately
self.parent.start_live_mode()
def _start_data_thread(self):
self.thread_event = threading.Event()
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = ueye.get_data(
self.pcImageMemory,
self.width,
self.height,
self.nBitsPerPixel,
self.pitch,
copy=False,
)
# bytes_per_pixel = int(nBitsPerPixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.parent.image_data.put(frame)
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=frame)
time.sleep(0.1)
def on_trigger(self):
pass
# self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=self.parent.image_data.get())
class IDSCamera(PSIDetectorBase):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
image_data = Cpt(PreviewSignal, value=np.empty((100, 100)), kind=Kind.omitted)
custom_prepare_cls = IDSCustomPrepare
image_data = Cpt(SetableSignal, value=np.empty((100, 100)), kind=Kind.omitted)
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
def __init__(
self,
@@ -29,219 +234,19 @@ class IDSCamera(PSIDeviceBase):
channels: int,
m_n_colormode: int,
kind=None,
parent=None,
device_manager=None,
**kwargs,
):
super().__init__(
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs
)
self.camera_ID = camera_ID
self.bits_per_pixel = bits_per_pixel
self.bytes_per_pixel = None
self.channels = channels
self._m_n_colormode_input = m_n_colormode
self.m_n_colormode = None
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.c_info = None
self.pc_image_memory = None
self.mem_id = None
self.rect_aoi = None
self.pitch = None
self.n_bits_per_pixel = None
self.width = None
self.height = None
self.thread_event = threading.Event()
self.data_thread = None
def start_backend(self):
if self.ueye is None:
raise ImportError("The pyueye library is not installed.")
self.h_cam = self.ueye.HIDS(
self.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.s_info = self.ueye.SENSORINFO()
self.c_info = self.ueye.CAMINFO()
self.pc_image_memory = self.ueye.c_mem_p()
self.mem_id = self.ueye.int()
self.rect_aoi = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.n_bits_per_pixel = self.ueye.INT(
self.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.m_n_colormode = self.ueye.INT(
self._m_n_colormode_input
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
# Starts the driver and establishes the connection to the camera
ret = self.ueye.is_InitCamera(self.h_cam, None)
if ret != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
ret = self.ueye.is_ResetToDefault(self.h_cam)
if ret != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_n_colormode: \t\t", self.m_n_colormode)
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
n_bits_per_pixel = self.ueye.INT(32)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
ret = self.ueye.is_AOI(
self.h_cam,
self.ueye.IS_AOI_IMAGE_GET_AOI,
self.rect_aoi,
self.ueye.sizeof(self.rect_aoi),
)
if ret != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
self.width = self.rect_aoi.s32Width
self.height = self.rect_aoi.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
# ---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
ret = self.ueye.is_AllocImageMem(
self.h_cam,
self.width,
self.height,
self.n_bits_per_pixel,
self.pc_image_memory,
self.mem_id,
)
if ret != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
if ret != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
# Activates the camera's live video mode (free run mode)
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
if ret != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
ret = self.ueye.is_InquireImageMem(
self.h_cam,
self.pc_image_memory,
self.mem_id,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
)
if ret != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
# startmeasureframerate = True
# Gain = False
# Start live mode of camera immediately
self.start_live_mode()
def _start_data_thread(self):
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
if self.ueye is None:
print("pyueye library not available.")
return
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = self.ueye.get_data(
self.pc_image_memory,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
copy=False,
)
# bytes_per_pixel = int(n_bits_per_pixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.image_data.put(frame)
time.sleep(0.1)
self.m_n_colormode = m_n_colormode
#TODO fix connected and wait_for_connection
self.custom_prepare.on_connection_established()
def wait_for_connection(self, all_signals=False, timeout=10):
if ueye is None:
@@ -249,64 +254,226 @@ class IDSCamera(PSIDeviceBase):
"The pyueye library is not installed or doesn't provide the necessary c libs"
)
super().wait_for_connection(all_signals, timeout)
#self.custom_prepare.on_connection_established()
def destroy(self):
"""Extend Ophyds destroy function to kill the data thread"""
self.stop_live_mode()
super().destroy()
def start_live_mode(self):
if self.data_thread is not None:
if self.custom_prepare.data_thread is not None:
self.stop_live_mode()
self._start_data_thread()
self.custom_prepare._start_data_thread()
def stop_live_mode(self):
"""Stopping the camera live mode."""
self.thread_event.set()
if self.data_thread is not None:
self.data_thread.join()
self.thread_event.clear()
self.data_thread = None
if self.custom_prepare.thread_event is not None:
self.custom_prepare.thread_event.set()
if self.custom_prepare.data_thread is not None:
self.custom_prepare.data_thread.join()
self.custom_prepare.thread_event = None
self.custom_prepare.data_thread = None
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
"""from pyueye import ueye
import numpy as np
import cv2
import sys
import time
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
#---------------------------------------------------------------------------------------------------------------------------------------
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.start_backend()
#Variables
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
sInfo = ueye.SENSORINFO()
cInfo = ueye.CAMINFO()
pcImageMemory = ueye.c_mem_p()
MemID = ueye.int()
rectAOI = ueye.IS_RECT()
pitch = ueye.INT()
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
bytes_per_pixel = int(nBitsPerPixel / 8)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
ids_cam
...
deviceConfig:
camera_ID: 202
bits_per_pixel: 24
channels: 3
m_n_colormode: 1
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
#---------------------------------------------------------------------------------------------------------------------------------------
print("START")
print()
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
# Starts the driver and establishes the connection to the camera
nRet = ueye.is_InitCamera(hCam, None)
if nRet != ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
nRet = ueye.is_GetCameraInfo(hCam, cInfo)
if nRet != ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
# You can query additional information about the sensor type used in the camera
nRet = ueye.is_GetSensorInfo(hCam, sInfo)
if nRet != ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
nRet = ueye.is_ResetToDefault( hCam)
if nRet != ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
# Set display mode to DIB
nRet = ueye.is_SetDisplayMode(hCam, ueye.IS_SET_DM_DIB)
def on_stop(self) -> None:
"""Called when the device is stopped."""
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
self.stop_live_mode()
# Set the right color mode
if int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_BAYER:
# setup the color depth to the current windows setting
ueye.is_GetColorDepth(hCam, nBitsPerPixel, m_nColorMode)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_BAYER: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_CBYCRY:
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
nBitsPerPixel = ueye.INT(32)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_CBYCRY: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_MONOCHROME:
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_MONO8
nBitsPerPixel = ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_MONOCHROME: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_nColorMode = ueye.IS_CM_MONO8
nBitsPerPixel = ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
nRet = ueye.is_AOI(hCam, ueye.IS_AOI_IMAGE_GET_AOI, rectAOI, ueye.sizeof(rectAOI))
if nRet != ueye.IS_SUCCESS:
print("is_AOI ERROR")
width = rectAOI.s32Width
height = rectAOI.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", sInfo.strSensorName.decode('utf-8'))
print("Camera serial no.:\t", cInfo.SerNo.decode('utf-8'))
print("Maximum image width:\t", width)
print("Maximum image height:\t", height)
print()
#---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
nRet = ueye.is_AllocImageMem(hCam, width, height, nBitsPerPixel, pcImageMemory, MemID)
if nRet != ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
nRet = ueye.is_SetImageMem(hCam, pcImageMemory, MemID)
if nRet != ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
nRet = ueye.is_SetColorMode(hCam, m_nColorMode)
# Activates the camera's live video mode (free run mode)
nRet = ueye.is_CaptureVideo(hCam, ueye.IS_DONT_WAIT)
if nRet != ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
nRet = ueye.is_InquireImageMem(hCam, pcImageMemory, MemID, width, height, nBitsPerPixel, pitch)
if nRet != ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
startmeasureframerate=True
Gain = False
#---------------------------------------------------------------------------------------------------------------------------------------
# Continuous image display
while(nRet == ueye.IS_SUCCESS):
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = ueye.get_data(pcImageMemory, width, height, nBitsPerPixel, pitch, copy=False)
# bytes_per_pixel = int(nBitsPerPixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array,(height.value, width.value, bytes_per_pixel))
# ...resize the image by a half
frame = cv2.resize(frame,(0,0),fx=0.5, fy=0.5)
#---------------------------------------------------------------------------------------------------------------------------------------
#Include image data processing here
#---------------------------------------------------------------------------------------------------------------------------------------
#...and finally display it
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
if startmeasureframerate:
starttime = time.time()
startmeasureframerate=False
framenumber=0
if time.time() > starttime+5:
print(f"Caught {framenumber/5} frames per second")
startmeasureframerate=True
Gain = ~Gain
if Gain:
nRet = ueye.is_SetGainBoost(hCam, 1)
else:
nRet = ueye.is_SetGainBoost(hCam, 0)
print(f"Gain setting status {nRet}")
#...and finally display it
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
framenumber+=1
time.sleep(0.1)
# Press q if you want to end the loop
if (cv2.waitKey(1) & 0xFF) == ord('q'):
break
#---------------------------------------------------------------------------------------------------------------------------------------
# Releases an image memory that was allocated using is_AllocImageMem() and removes it from the driver management
ueye.is_FreeImageMem(hCam, pcImageMemory, MemID)
# Disables the hCam camera handle and releases the data structures and memory areas taken up by the uEye camera
ueye.is_ExitCamera(hCam)
# Destroys the OpenCv windows
cv2.destroyAllWindows()
print()
print("END")
"""

View File

@@ -0,0 +1,83 @@
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Kind, Signal
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.bec_device_base import BECDeviceBase
logger = bec_logger.logger
# Readout precision for Setable/ReadOnlySignal signals
PRECISION = 3
class ReadOnlySignal(Signal):
"""Setable signal for simulated devices.
The signal will store the value in sim_state of the SimulatedData class of the parent device.
It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal,
this signal can be written to.
The setable signal inherits from the Signal class of ophyd, thus the class attribute needs to be
initiated as a Component (class from ophyd).
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
Parameters
----------
name (string) : Name of the signal
parent (object) : Parent object of the signal, default none.
value (any) : Initial value of the signal, default 0.
kind (int) : Kind of the signal, default Kind.normal.
precision (float) : Precision of the signal, default PRECISION.
"""
def __init__(
self,
name: str,
*args,
fcn: callable,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update(connected=True, write_access=False)
self._value = None
self.precision = precision
self.fcn = fcn
# pylint: disable=arguments-differ
def get(self):
"""Get the current position of the simulated device.
Core function for signal.
"""
self._value = self.fcn()
return self._value
# pylint: disable=arguments-differ
def put(self, value):
"""Put the value to the simulated device.
Core function for signal.
"""
self._update_sim_state(value)
self._value = value
def describe(self):
"""Describe the readback signal.
Core function for signal.
"""
res = super().describe()
if self.precision is not None:
res[self.name]["precision"] = self.precision
return res
@property
def timestamp(self):
"""Timestamp of the readback value"""
return self._get_timestamp()

View File

@@ -0,0 +1,221 @@
"""Eiger detector for cSAXS beamline at the Swiss Light Source.
16bit mode supports 8e7 counts/s per pixel,
you will never have more than 12bit subframes, which means 4000 counts per subframe.
32bit mode supports 2e7 counts/s per pixel,
you will never have more than 24bit subframe, which means 16.7 million counts per subframe.
"""
import enum
from typing import TYPE_CHECKING
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_timing import DetectorTiming
from ophyd import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from csaxs_bec.devices.jungfraujoch.readout_constants import EIGER9M_READOUT_TIME_32BIT
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
logger = bec_logger.logger
class EigerCSAXSBitDepth(int, enum.Enum):
"""Bit depth for EIGER detector at cSAXS beamline."""
BIT_DEPTH_16 = 16
BIT_DEPTH_32 = 32
class Eiger9MCSAXS(PSIDeviceBase):
"""
-----------
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
Relevant commands for debugging:
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
Some additional notes:
------------
- If energy on JFJ is set via DetectorSettings, the one in DatasetSettings will be ignored.
- One can set this initially, and then set it to none in DetectorSettings such that any update in DatasetSettings will be considered.
- IMPORTANT: Any change in energy will be detector. It will be best to have a check ourselves with a certain tolerance of ~ % to not constantly update the energy.
- in 'gating' mode, frame_time_us and count_time_us are not used.
- The image_time_us of the DatasetSettings and the frame_time_us of the DetectorSettings need to be the same.
- The difference between frame_time_us and count_time_us is the readout time.
- 16bit and 32bit, when do we switch?
- If switching is desired, the readout time needs to be adapted as a function of internal exposure time, and bit depth.
This can be up to ~400us for 32bit, and long exposures. This needs to be discussed!
- 16bit mode supports 8e7 counts/s per pixel,
It needs to be se to None.
------------
Eiger - if power cycling is needed. Use a combination of commands that connect to the chip, and the conda package.
The package is available via:
cd /sls/X12SA/data/gac-x12sa/erik/micromamba
source setup_9m.sh
------------
Nice to set high voltage low first, from conda package (sls_detector_package)
p highvoltage 0 or 150 (operational)
g highvoltage
# Put high voltage to 0 before power cylcing it.
telnet bchip500
cd power_control_user/
./on
./off
"""
########################################
# Beamline Specific Implementations #
########################################
USER_ACCESS = ["jfj_client"]
def __init__(
self,
name: str,
host: str = "http://sls-jfjoch-001",
port: int = 8080,
scan_info: ScanInfo = None,
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
self.device_manager = device_manager
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
self._bit_depth = 16
self.frame_time = 500e-6 # 500us, will be ignored in DetectorTiming.Gated
self.count_time = 300e-6 # 480us, will be ignored in DetectorTiming.Gated
# If not gated, frame_time and count_time will be used and logic has to be adjusted
self._timing = DetectorTiming.GATED
def on_init(self) -> None:
"""
Called when the device is initialized.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
# Stop first in case it was in an uncertain state (i.e. measuring)
logger.info(f"On connected for {self.name}")
self.jfj_client.stop()
# Try to connect, needs to be in Inactive or Error state
self.jfj_client.connect_and_initialise(timeout=5)
# Set energy threshold for EIGER detector
threshold_ke_v = 6.200 # Grab this from mono energy pseudo device
# Energy threshold provided in DetectorSettings, than it is ignored in DatasetSettings
# This sets the energy threshold for the EIGER detector
settings = DetectorSettings(
frame_time_us=int(self.frame_time * 1e6),
count_time_us=int(self.count_time * 1e6),
eiger_bit_depth=self._bit_depth,
eiger_threshold_ke_v=threshold_ke_v,
timing=self._timing,
)
self.jfj_client.set_detector_settings(settings)
# Second call is needed to ensure that eiger_threshold_ke_v is set to None
# if not, DatasetSettings for eiger_threshold_ke_v will be ignored
# settings = DetectorSettings(
# frame_time_us=int(self.frame_time * 1e6),
# count_time_us=int(self.count_time * 1e6),
# eiger_bit_depth=self._bit_depth,
# timing=self._timing,
# )
# self.jfj_client.set_detector_settings(settings)
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
"""
# Delay generator ddg_jfj needs to be activate
ddg = self.device_manager.devices.get("ddg_jfj", None)
if ddg is None:
logger.warning("ddg_jfj not found in device manager")
raise ValueError("ddg_jfj not found in device manager")
ntrigger = ddg.compute_num_trigger()
if self.scan_info.msg.scan_type == "step":
# Energy threshold provided in DetectorSettings, than it is ignored in DatasetSettings
print()
data_settings = DatasetSettings(
image_time_us=int(self.frame_time * 1e6), # this is frame_time
ntrigger=ntrigger,
beam_x_pxl=0,
beam_y_pxl=0,
detector_distance_mm=100,
incident_energy_ke_v=10.00,
# file_prefix = full_path_to_file,
)
# status = self.task_handler.submit_task(
# self.jfj_client.start, task_args=(data_settings,), run=True
# )
# return status
self.jfj_client.start(settings=data_settings)
# This method computes trigger_pulse_width, ntriggers and bit_depth
# trigger_pulse_width -> image_time in s (image_time_us)
# ntriggers -> number of images per trigger
# bit_depth -> 16 or 32
def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
def wait_for_complete():
timeout = 10
for _ in range(timeout):
try:
self.jfj_client.wait_till_done(timeout=1)
except TimeoutError:
continue
except Exception as e:
raise ValueError(f"Error in complete for {self.name}, exception: {e}") from e
else:
break
status = self.task_handler.submit_task(wait_for_complete, run=True)
return status
def on_kickoff(self) -> DeviceStatus | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop()
self.task_handler.shutdown()

View File

@@ -1,8 +1,18 @@
"""Module with client interface for the Jungfrau Joch detector API"""
import enum
import math
import time
import jfjoch_client
from bec_lib.logger import bec_logger
from jfjoch_client.api.default_api import DefaultApi
from jfjoch_client.api_client import ApiClient
from jfjoch_client.api_response import ApiResponse
from jfjoch_client.configuration import Configuration
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from ophyd import Device
logger = bec_logger.logger
@@ -11,10 +21,8 @@ class JungfrauJochClientError(Exception):
"""Base class for exceptions in this module."""
class DetectorState(enum.StrEnum):
"""Detector states for Jungfrau Joch detector
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
"""
class DetectorState(str, enum.Enum):
"""Possible Detector states for Jungfrau Joch detector"""
INACTIVE = "Inactive"
IDLE = "Idle"
@@ -24,24 +32,46 @@ class DetectorState(enum.StrEnum):
ERROR = "Error"
class ResponseWaitDone(enum.IntEnum):
"""Response state for Jungfrau Joch detector wait till done"""
class ResponseWaitDone(int, enum.Enum):
"""Response state for Jungfrau Joch detector wait till done call"""
DETECTOR_IDLE = 200
TIMEOUT_PARAM_OUT_OF_RANGE = 400
TIMEOUT_PARAM_OUT_OF_BOUNDS = 400
JUNGFRAU_ERROR = 500
DETECTOR_INACTIVE = 502
TIMEOUT_REACHED = 504
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client"""
class ResponseCancelDone(int, enum.Enum):
"""HTTP Response for cancel post"""
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
CANCEL_SENT_TO_FPGA = 200
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client.
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
It looks as if the detector is not being stopped properly.
One module remains running, how can we restart the detector?
"""
def __init__(
self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None
) -> None:
self._initialised = False
configuration = jfjoch_client.Configuration(host=host)
api_client = jfjoch_client.ApiClient(configuration)
self.api = jfjoch_client.DefaultApi(api_client)
configuration = Configuration(host=host)
api_client = ApiClient(configuration)
self.api = DefaultApi(api_client)
self._parent_name = parent.name if parent else self.__class__.__name__
@property
def jjf_state(self) -> BrokerStatus:
"""Get the status of JungfrauJoch"""
response = self.api.status_get()
return BrokerStatus(**response.to_dict())
@property
def initialised(self) -> bool:
@@ -53,19 +83,19 @@ class JungfrauJochClient:
"""Set the connected status"""
self._initialised = value
def get_jungfrau_joch_status(self) -> DetectorState:
def get_detector_state(self) -> DetectorState:
"""Get the status of JungfrauJoch"""
return self.api.status_get().state
return DetectorState(self.jjf_state.state)
def connect_and_initialise(self, timeout: int = 5) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
status = self.api.status_get().state
status = self.get_detector_state()
if status != DetectorState.IDLE:
self.api.initialize_post()
self.wait_till_done(timeout)
self.initialised = True
self.wait_till_done(timeout) # Blocking call
self.initialised = True
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
def set_detector_settings(self, settings: dict | DetectorSettings) -> None:
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
@@ -73,81 +103,91 @@ class JungfrauJochClient:
settings (dict): dictionary of settings
"""
state = self.api.status_get().state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
time.sleep(1) # This can be improved.... #TODO
state = self.api.status_get().state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
self.api.config_detector_put(settings)
settings = DetectorSettings(**settings)
self.api.config_detector_put(detector_settings=settings)
# Check with Filip if this call is blocking! also check if put_with_http is better
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
def start(self, settings: dict | DatasetSettings) -> None:
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
Please check the DataSettings class for the available settings.
The minimum required settings are:
beam_x_pxl: StrictFloat | StrictInt,
beam_y_pxl: StrictFloat | StrictInt,
detector_distance_mm: float | int,
incident_energy_keV: float | int,
Args:
settings (dict): dictionary of settings
Please check the DataSettings class for the available settings. Minimum required settings are
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
"""
state = self.api.status_get().state
state = self.get_detector_state()
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Detector must be in IDLE state to set settings. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
settings = DatasetSettings(**settings)
try:
res = self.api.start_post_with_http_info(dataset_settings=settings)
res: ApiResponse = self.api.start_post_with_http_info(dataset_settings=settings)
if res.status_code != 200:
logger.error(
f"Error while setting measurement settings {settings}, response: {res}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}, response: {res}"
)
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, response: {res}"
raise JungfrauJochClientError(response)
except JungfrauJochClientError as e:
logger.error(e)
raise e
except Exception as e:
logger.error(
f"Error while setting measurement settings {settings}. Exception raised {e}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}. Exception raised {e}"
) from e
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, exception: {e}"
logger.error(response)
raise JungfrauJochClientError(response) from e
def stop(self) -> None:
"""Stop the acquisition"""
try:
res: ApiResponse = self.api.cancel_post_with_http_info() # Should we use a timeout?
if res.status_code != ResponseCancelDone.CANCEL_SENT_TO_FPGA:
response = f"Error in device {self._parent_name} while stopping the measurement. API Response: {res}"
raise JungfrauJochClientError(response)
except JungfrauJochClientError as e:
raise e
except Exception as exc:
raise JungfrauJochClientError from exc
def wait_till_done(self, timeout: int = 5) -> None:
"""Wait until JungfrauJoch is done.
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
Args:
timeout (int): timeout in seconds
"""
success = False
try:
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
logger.info(
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
)
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
success = True
return
logger.info(
f"Waiting for device {self._parent_name}, jungfrau joch to become IDLE, "
f"status: {ResponseWaitDone(response.status_code)}; response msg {response}"
)
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
return
except Exception as e:
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
logger.error(
f"Error in device {self._parent_name} while waiting for JungfrauJoch to initialise: {e}"
)
raise JungfrauJochClientError(
f"Error while waiting for JungfrauJoch to initialise: {e}"
f"Error in device {self._parent_name} while waiting for JungfrauJoch to initialise. Exception: {e}"
) from e
else:
if success is False:
logger.error(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)
raise JungfrauJochClientError(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)
# If the response is IDLE, this part is never reached. We will raise a TimeoutError.
msg = (
f"TimeoutError in device {self._parent_name}, failed to initialise JungfrauJoch with status:"
f"{response.status_code}; response msg {response}"
)
logger.error(msg)
raise TimeoutError(msg)

View File

@@ -0,0 +1,4 @@
"""Readout constants for all relevant detectors at cSAXS beamline."""
# -> should 20e-6, 20us : parallel vs nonparallel, exact values to be checked
EIGER9M_READOUT_TIME_32BIT = 100e-6 # s

View File

@@ -116,12 +116,13 @@ class FlomniFermatScan(SyncFlyScanBase):
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
msg = self.connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
producer = self.device_manager.producer
msg = producer.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
self.connector.set(
producer.set(
MessageEndpoints.global_vars("reverse_flomni_trajectory"),
messages.VariableMessage(value=(not val)),
)
@@ -279,7 +280,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")

View File

@@ -1,12 +0,0 @@
# from .metadata_schema_template import ExampleSchema
METADATA_SCHEMA_REGISTRY = {
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
# "example_scan": ExampleSchema
}
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None

View File

@@ -1,34 +0,0 @@
# # By inheriting from BasicScanMetadata you can define a schema by which metadata
# # supplied to a scan must be validated.
# # This schema is a Pydantic model: https://docs.pydantic.dev/latest/concepts/models/
# # but by default it will still allow you to add any arbitrary information to it.
# # That is to say, when you run a scan with which such a model has been associated in the
# # metadata_schema_registry, you can supply any python dictionary with strings as keys
# # and built-in python types (strings, integers, floats) as values, and these will be
# # added to the experiment metadata, but it *must* contain the keys and values of the
# # types defined in the schema class.
# #
# #
# # For example, say that you would like to enforce recording information about sample
# # pretreatment, you could define the following:
# #
#
# from bec_lib.metadata_schema import BasicScanMetadata
#
#
# class ExampleSchema(BasicScanMetadata):
# treatment_description: str
# treatment_temperature_k: int
#
#
# # If this was used according to the example in metadata_schema_registry.py,
# # then when calling the scan, the user would need to write something like:
# >>> scans.example_scan(
# >>> motor,
# >>> 1,
# >>> 2,
# >>> 3,
# >>> metadata={"treatment_description": "oven overnight", "treatment_temperature_k": 575},
# >>> )
#
# # And the additional metadata would be saved in the HDF5 file created for the scan.

View File

@@ -116,12 +116,13 @@ class OMNYFermatScan(SyncFlyScanBase):
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
msg = self.connector.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
producer = self.device_manager.producer
msg = producer.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
self.connector.set(
producer.set(
MessageEndpoints.global_vars("reverse_omny_trajectory"),
messages.VariableMessage(value=(not val)),
)
@@ -264,7 +265,7 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")

View File

@@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "csaxs_bec"
version = "0.0.0"
description = "The cSAXS plugin repository for BEC"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
@@ -28,21 +28,19 @@ dependencies = [
[project.optional-dependencies]
dev = [
"black",
"copier",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"ophyd_devices",
"bec_server",
"pytest-redis",
]
[project.entry-points."bec"]
plugin_bec = "csaxs_bec"
[project.entry-points."bec.deployment.device_server"]
plugin_ds_startup = "csaxs_bec.deployments.device_server.startup:run"
plugin_ds_startup = "csaxs_bec.deployment.device_server.startup:run"
[project.entry-points."bec.file_writer"]
plugin_file_writer = "csaxs_bec.file_writer"
@@ -50,18 +48,12 @@ plugin_file_writer = "csaxs_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "csaxs_bec.scans"
[project.entry-points."bec.scans.metadata_schema"]
plugin_metadata_schema = "csaxs_bec.scans.metadata_schema"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "csaxs_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "csaxs_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "csaxs_bec.bec_widgets.auto_updates"
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "csaxs_bec.bec_widgets.widgets"
[project.entry-points."bec.widgets"]
plugin_widgets = "csaxs_bec.bec_widgets"
[tool.hatch.build.targets.wheel]
include = ["*"]

View File

@@ -1,34 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your _python environment_.
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -28,10 +28,7 @@ class RTMock(DeviceBase):
def test_save_frame(bec_client_mock):
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
with mock.patch(
@@ -46,10 +43,7 @@ def test_update_frame(bec_client_mock):
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
with mock.patch(epics_put) as epics_put_mock:
@@ -70,16 +64,10 @@ def test_update_frame(bec_client_mock):
def test_disable_rt_feedback(bec_client_mock):
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
client.device_manager.devices.rtx = RTMock(
name="rtx",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.rtx = RTMock(name="rtx", config={})
with mock.patch.object(
align.device_manager.devices.rtx.controller, "feedback_disable"
) as fdb_disable:
@@ -89,16 +77,10 @@ def test_disable_rt_feedback(bec_client_mock):
def test_enable_rt_feedback(bec_client_mock):
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
client.device_manager.devices.rtx = RTMock(
name="rtx",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.rtx = RTMock(name="rtx", config={})
with mock.patch.object(
align.device_manager.devices.rtx.controller, "feedback_enable_with_reset"
) as fdb_enable:
@@ -114,16 +96,10 @@ def test_tomo_rotate(bec_client_mock):
client._update_namespace_callback = mock.MagicMock()
client.callbacks = mock.MagicMock()
client.load_high_level_interface("bec_hli")
client.device_manager.devices.xeye = DeviceBase(
name="xeye",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.xeye = DeviceBase(name="xeye", config={})
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
client.device_manager.devices.lsamrot = RTMock(
name="lsamrot",
config={"enabled": True, "deviceClass": "test_class", "readoutPriority": "baseline"},
)
client.device_manager.devices.lsamrot = RTMock(name="lsamrot", config={})
with mock.patch.object(builtins, "umv") as umv:
align.tomo_rotate(5)
umv.assert_called_once_with(client.device_manager.devices.lsamrot, 5)

View File

@@ -1,34 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your _python environment_.
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,34 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your _python environment_.
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,34 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your _python environment_.
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,54 @@
import pytest
from jfjoch_client.api_response import ApiResponse
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import DetectorState, ResponseWaitDone
def test_jungfrau_joch_client_models_broker_status():
"""Test BrokerStatus model from JJF client"""
# Test broker status model
broker_status = BrokerStatus
assert "state" in broker_status.model_fields
assert "progress" in broker_status.model_fields
# Test that all DetectorStates are valid BrokerStatus states. This will not raise if
for state in DetectorState:
broker_status = BrokerStatus(state=state.value)
# Test an invalid state
with pytest.raises(ValueError):
broker_status = BrokerStatus(state="wrong")
def test_jungfrau_joch_client_models_dataset_settings():
"""Test DatasetSettings model from JJF client"""
# Test detector state model
settings = {
"beam_x_pxl": 0,
"beam_y_pxl": 0,
"detector_distance_mm": 100,
"incident_energy_keV": 10.00,
}
# Try creating DatasetSettings object with minimal required settigns
dataset_settings = DatasetSettings(**settings)
# Test that image_time_ns and ntrigger are still available
settings["image_time_us"] = 1000
settings["ntrigger"] = 100
dataset_settings = DatasetSettings(**settings)
def test_jungfrau_joch_client_models_api_response():
"""Test APIResponse model from JJF client.
We can only check that all http status code responses are valid.
"""
# Check if all ResponseWaitDone http status codes are valid for the APIResponse model
for state in ResponseWaitDone:
response = ApiResponse(status_code=state.value, data="", headers=None, raw_data=b"")
def test_jungfrau_joch_client_models_detector_settigns():
"""Test DetectorSettings model from JJF client"""
# Must be initialized with frame_time_us
settings = {"frame_time_us": 450}
DetectorSettings(**settings) # type:ignore

View File

@@ -1,34 +0,0 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,34 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your _python environment_.
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).