8 Commits

Author SHA1 Message Date
60d1dfc5af Update repo with template version v1.2.8
All checks were successful
CI for debye_bec / test (pull_request) Successful in 1m4s
CI for debye_bec / test (push) Successful in 1m2s
2026-02-27 15:49:26 +01:00
3e2e37908b Update repo with template version v1.2.7
Some checks failed
CI for debye_bec / test (push) Failing after 0s
CI for debye_bec / test (pull_request) Failing after 0s
2026-02-27 12:11:40 +01:00
804a731181 test(pilatus): Fix on_complete callback for pilatus
All checks were successful
CI for debye_bec / test (pull_request) Successful in 1m0s
CI for debye_bec / test (push) Successful in 1m4s
2025-12-05 14:18:46 +01:00
99f6192f37 refactor: deprecate duplicate Status implementation.
Some checks failed
CI for debye_bec / test (push) Failing after 1m2s
CI for debye_bec / test (pull_request) Failing after 1m9s
2025-11-30 22:29:23 +01:00
0a8272685d fix(status): cleanup and remove of old status usage 2025-11-30 22:28:34 +01:00
c6ed27966c fix(status): fix compare and transition status occurences
Some checks failed
CI for debye_bec / test (push) Failing after 1m9s
CI for debye_bec / test (pull_request) Failing after 3m33s
2025-11-26 13:46:53 +01:00
6a8f6c7988 fix: remove enums from typehints
Some checks failed
CI for debye_bec / test (pull_request) Failing after 2s
CI for debye_bec / test (push) Successful in 1m16s
2025-09-18 07:17:48 +02:00
6bfc8999f7 refactor: fix set_exception for AndStatusWithList 2025-09-18 07:14:39 +02:00
12 changed files with 102 additions and 299 deletions

View File

@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things. # It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates. # This file will be overwritten by copier on template updates.
_commit: v1.2.2 _commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git _src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false make_commit: false
project_name: debye_bec project_name: debye_bec

View File

@@ -28,7 +28,7 @@ on:
description: "Python version to use" description: "Python version to use"
required: false required: false
type: string type: string
default: "3.11" default: "3.12"
permissions: permissions:
pull-requests: write pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}" python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/debye_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./debye_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core - name: Checkout BEC Core
uses: actions/checkout@v4 uses: actions/checkout@v4
@@ -67,13 +79,6 @@ jobs:
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets path: ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/debye_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./debye_bec
- name: Install dependencies - name: Install dependencies
shell: bash shell: bash
run: | run: |

View File

@@ -0,0 +1,62 @@
name: Create template upgrade PR for debye_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"

View File

@@ -1,7 +0,0 @@
include:
- file: /templates/plugin-repo-template.yml
inputs:
name: debye_bec
target: debye_bec
branch: $CHILD_PIPELINE_BRANCH
project: bec/awi_utils

View File

@@ -9,8 +9,7 @@ from ophyd import Component as Cpt
from ophyd import Device from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.status import DeviceStatus, SubscriptionStatus from ophyd_devices import CompareStatus, DeviceStatus, SubscriptionStatus, TransitionStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked from typeguard import typechecked
@@ -105,7 +104,7 @@ class IonizationChamber0(PSIDeviceBase):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
@typechecked @typechecked
def set_gain(self, gain: Literal["1e6", "1e7", "5e7", "1e8", "1e9"] | AmplifierGain) -> None: def set_gain(self, gain: Literal["1e6", "1e7", "5e7", "1e8", "1e9"]) -> None:
"""Configure the gain setting of the specified channel """Configure the gain setting of the specified channel
Args: Args:
@@ -131,10 +130,7 @@ class IonizationChamber0(PSIDeviceBase):
self.amp.cGain_ENUM.put(AmplifierGain.G1E9) self.amp.cGain_ENUM.put(AmplifierGain.G1E9)
def set_filter( def set_filter(
self, self, value: Literal["1us", "3us", "10us", "30us", "100us", "300us", "1ms", "3ms"]
value: (
Literal["1us", "3us", "10us", "30us", "100us", "300us", "1ms", "3ms"] | AmplifierFilter
),
) -> None: ) -> None:
"""Configure the filter setting of the specified channel """Configure the filter setting of the specified channel

View File

@@ -9,16 +9,15 @@ used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action.""" to allow for a more stable execution of the action."""
import time import time
from typing import Any, Literal from typing import Literal
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase from ophyd import DeviceStatus, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typeguard import typechecked from typeguard import typechecked
@@ -281,7 +280,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.scan_control.scan_status, self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING], transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True, strict=True,
raise_states=[ScanControlScanStatus.PARAMETER_WRONG], failure_states=[ScanControlScanStatus.PARAMETER_WRONG],
) )
self.cancel_on_stop(status) self.cancel_on_stop(status)
start_func(1) start_func(1)

View File

@@ -1,12 +1,11 @@
from __future__ import annotations from __future__ import annotations
import time from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Literal, cast
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.sim.sim_signals import SetableSignal

View File

@@ -17,12 +17,10 @@ from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin
from ophyd.status import WaitTimeoutError from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from debye_bec.devices.pilatus.utils import AndStatusWithList
if TYPE_CHECKING: # pragma: no cover if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
@@ -360,14 +358,12 @@ class Pilatus(PSIDeviceBase, ADBase):
# f"Live Mode on detector {self.name} did not stop: {content} after 10s." # f"Live Mode on detector {self.name} did not stop: {content} after 10s."
# ) # )
def check_detector_stop_running_acquisition(self) -> AndStatusWithList: def check_detector_stop_running_acquisition(self) -> AndStatus:
"""Check if the detector is still running an acquisition.""" """Check if the detector is still running an acquisition."""
status_acquire = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value) status_acquire = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
status_writing = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value) status_writing = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value) status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value)
status = AndStatusWithList( status = status_acquire & status_writing & status_cam_server
device=self, status_list=[status_acquire, status_writing, status_cam_server]
)
return status return status
def _calculate_trigger(self, scan_msg: ScanStatusMessage): def _calculate_trigger(self, scan_msg: ScanStatusMessage):
@@ -562,9 +558,7 @@ class Pilatus(PSIDeviceBase, ADBase):
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value) status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value) status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value) status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value)
status = AndStatusWithList( status = status_hdf & status_cam & status_cam_server
device=self, status_list=[status_hdf, status_cam, status_cam_server]
)
self.cam.acquire.put(1) self.cam.acquire.put(1)
self.hdf.capture.put(1) self.hdf.capture.put(1)
self.cancel_on_stop(status) self.cancel_on_stop(status)
@@ -593,15 +587,15 @@ class Pilatus(PSIDeviceBase, ADBase):
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step" scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
): # TODO how to deal with fly scans? ): # TODO how to deal with fly scans?
if status.success: if status.success:
status.device.file_event.put( self.file_event.put(
file_path=status.device._full_path, # pylint: disable:protected-access file_path=self._full_path,
done=True, done=True,
successful=True, successful=True,
hinted_h5_entries={"data": "/entry/data/data"}, hinted_h5_entries={"data": "/entry/data/data"},
) )
else: else:
status.device.file_event.put( self.file_event.put(
file_path=status.device._full_path, # pylint: disable:protected-access file_path=self._full_path,
done=True, done=True,
successful=False, successful=False,
hinted_h5_entries={"data": "/entry/data/data"}, hinted_h5_entries={"data": "/entry/data/data"},
@@ -622,15 +616,12 @@ class Pilatus(PSIDeviceBase, ADBase):
# For long scans, it can be that the mono will execute one cycle more, # For long scans, it can be that the mono will execute one cycle more,
# meaning a few more XRD triggers will be sent # meaning a few more XRD triggers will be sent
status_img_written = CompareStatus( status_img_written = CompareStatus(
self.hdf.num_captured, self.n_images, operation=">=" self.hdf.num_captured, self.n_images, operation_success=">="
) )
else: else:
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images) status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images) status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status = AndStatusWithList( status = status_hdf & status_cam & status_img_written & status_cam_server
device=self,
status_list=[status_hdf, status_cam, status_img_written, status_cam_server],
)
status.add_callback(self._complete_callback) # Callback that writing was successful status.add_callback(self._complete_callback) # Callback that writing was successful
self.cancel_on_stop(status) self.cancel_on_stop(status)
return status return status

View File

@@ -1,238 +0,0 @@
"""Temporary utility module for Status Object implementations."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ophyd import Device, DeviceStatus, StatusBase
class AndStatusWithList(DeviceStatus):
"""
Custom implementation of the AndStatus that combines the
option to add multiple statuses as a list, and in addition
allows for adding the Device as an object to access its
methods.
Args"""
def __init__(
self,
device: Device,
status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus],
**kwargs,
):
self.all_statuses = status_list if isinstance(status_list, list) else [status_list]
super().__init__(device=device, **kwargs)
self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
# TODO improve __repr__ and __str__
def __repr__(self):
return "<AndStatusWithList({self.all_statuses!r})>".format(self=self)
def __str__(self):
return "<AndStatusWithList(done={self.done}, success={self.success})>".format(self=self)
def __contains__(self, status: StatusBase | DeviceStatus) -> bool:
for child in self.all_statuses:
if child == status:
return True
if isinstance(child, AndStatusWithList):
if status in child:
return True
return False
# TODO Check if this actually works....
def set_exception(self, exc):
super().set_exception(exc)
# Propagate the exception to all sub-statuses that are not done yet.
with self._lock:
for st in self.all_statuses:
with st._lock:
if not st.done:
st.set_exception(exc)
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
if self._exception is None:
exc = TimeoutError(
f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
self.log.exception("%r encountered error during _settled()", self)
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
class AndStatus(StatusBase):
"""Custom AndStatus for TimePix detector."""
def __init__(
self,
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
name: str | Device | None = None,
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
**kwargs,
):
self.left = left if isinstance(left, list) else [left]
if right is not None:
self.right = right if isinstance(right, list) else [right]
else:
self.right = []
self.all_statuses = self.left + self.right
if name is None:
name = "unname_status"
elif isinstance(name, Device):
name = name.name
else:
name = name
self.name = name
super().__init__(**kwargs)
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)
def __str__(self):
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
def __contains__(self, status: StatusBase) -> bool:
for child in [self.left, self.right]:
if child == status:
return True
if isinstance(child, AndStatus):
if status in child:
return True
return False
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
# We have timed out. It's possible that set_finished() has already
# been called but we got here before the settle_time timer expired.
# And it's possible that in this space be between the above
# statement timing out grabbing the lock just below,
# set_exception(exc) has been called. Both of these possibilties
# are accounted for.
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
# Set the exception and mark the Status as done, unless
# set_exception(exc) was called externally before we grabbed
# the lock.
if self._exception is None:
exc = TimeoutError(
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
# No alternative but to log this. We can't supersede set_exception,
# and we have to continue and run the callbacks.
self.log.exception("%r encountered error during _settled()", self)
# Now we know whether or not we have succeed or failed, either by
# timeout above or by set_exception(exc), so we can set the Event that
# will mark this Status as done.
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
# The callbacks have access to self, from which they can distinguish
# success or failure.
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()

View File

@@ -10,8 +10,6 @@ from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices import CompareStatus, DeviceStatus from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from debye_bec.devices.pilatus.utils import AndStatusWithList
if TYPE_CHECKING: if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
@@ -83,8 +81,8 @@ class PilatusCurtain(PSIDeviceBase):
self.open_cover.put(1) self.open_cover.put(1)
# TODO timeout ok? # TODO timeout ok?
status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5) status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=") status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=")
status = AndStatusWithList(device=self, status_list=[status_open, status_error]) status = status_open & status_error
return status return status
else: else:
return None return None
@@ -95,8 +93,8 @@ class PilatusCurtain(PSIDeviceBase):
self.close_cover.put(1) self.close_cover.put(1)
# TODO timeout ok? # TODO timeout ok?
status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5) status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=") status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=")
status = AndStatusWithList(device=self, status_list=[status_close, status_error]) status = status_close & status_error
return status return status
else: else:
return None return None

View File

@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "debye_bec" name = "debye_bec"
version = "0.0.0" version = "0.0.0"
description = "A plugin repository for BEC" description = "A plugin repository for BEC"
requires-python = ">=3.10" requires-python = ">=3.11"
classifiers = [ classifiers = [
"Development Status :: 3 - Alpha", "Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",

View File

@@ -1,10 +1,8 @@
# pylint: skip-file # pylint: skip-file
import os
import threading import threading
from typing import TYPE_CHECKING, Generator from typing import TYPE_CHECKING, Generator
from unittest import mock from unittest import mock
import numpy as np
import ophyd import ophyd
import pytest import pytest
from bec_lib.messages import ScanStatusMessage from bec_lib.messages import ScanStatusMessage
@@ -177,7 +175,7 @@ def test_pilatus_on_trigger_cancel_on_stop(pilatus):
status.wait(timeout=5) status.wait(timeout=5)
def test_pilatus_on_complete(pilatus): def test_pilatus_on_complete(pilatus: Pilatus):
"""Test the on_complete logic of the Pilatus detector.""" """Test the on_complete logic of the Pilatus detector."""
if pilatus.scan_info.msg.scan_name.startswith("xas"): if pilatus.scan_info.msg.scan_name.startswith("xas"):