feat(timepix): Timepix integration with TimepixFly backend #10
@@ -12,7 +12,9 @@ classifiers = [
|
||||
"Programming Language :: Python :: 3",
|
||||
"Topic :: Scientific/Engineering",
|
||||
]
|
||||
dependencies = []
|
||||
dependencies = [
|
||||
"websockets",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
|
||||
27
superxas_bec/device_configs/sample_manipulator.yaml
Normal file
27
superxas_bec/device_configs/sample_manipulator.yaml
Normal file
@@ -0,0 +1,27 @@
|
||||
manip_new_trx:
|
||||
description: Sample Manipulator X-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-MAN:TRX
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
manip_new_try:
|
||||
description: Sample Manipulator Y-Translation
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-MAN:TRY
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
manip_new_trz:
|
||||
description: Sample Manipulator Z - Along beam
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: X10DA-ES1-MAN:TRZ
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
71
superxas_bec/device_configs/timepix.yaml
Normal file
71
superxas_bec/device_configs/timepix.yaml
Normal file
@@ -0,0 +1,71 @@
|
||||
sample_manipulator:
|
||||
- !include ./sample_manipulator.yaml
|
||||
|
||||
### Timepix Detector
|
||||
timepix:
|
||||
readoutPriority: async
|
||||
description: ASI Serval Timepix Detector
|
||||
deviceClass: superxas_bec.devices.timepix.timepix.Timepix
|
||||
deviceConfig:
|
||||
prefix: "X10DA-ES-TPX1:"
|
||||
backend_rest_url: "P6-0008.psi.ch:8452"
|
||||
hostname: "x10da-bec-001.psi.ch"
|
||||
enable_xes: false
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: true
|
||||
|
||||
### Ionization Chambers
|
||||
ic1:
|
||||
readoutPriority: monitored
|
||||
description: Ionization Chamber 1
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
read_pv: X10DA-ES1-SAI_01:MEAN
|
||||
auto_monitor: True
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
ic2:
|
||||
readoutPriority: monitored
|
||||
description: Ionization Chamber 2
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
read_pv: X10DA-ES1-SAI_02:MEAN
|
||||
auto_monitor: True
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
ic3:
|
||||
readoutPriority: monitored
|
||||
description: Ionization Chamber 3
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig:
|
||||
read_pv: X10DA-ES1-SAI_03:MEAN
|
||||
auto_monitor: True
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: False
|
||||
|
||||
### Monochromator Axis
|
||||
mono_energy:
|
||||
description: Axis for the QEXAFS monochromator
|
||||
deviceClass: ophyd_devices.devices.psi_motor.EpicsMotor
|
||||
deviceConfig:
|
||||
prefix: "X10DA-MO12-QEXAFS:E_TEST"
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
### Trigger Card #####
|
||||
trigger:
|
||||
readoutPriority: baseline
|
||||
description: Trigger Card
|
||||
deviceClass: superxas_bec.devices.trigger.Trigger
|
||||
deviceConfig:
|
||||
prefix: 'X10DA-ES1:'
|
||||
onFailure: raise
|
||||
enabled: True
|
||||
softwareTrigger: True
|
||||
0
superxas_bec/devices/timepix/__init__.py
Normal file
0
superxas_bec/devices/timepix/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
{"type": "PixelMap",
|
||||
"chips": [
|
||||
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
|
||||
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
|
||||
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
|
||||
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
|
||||
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
|
||||
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
|
||||
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
|
||||
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}
|
||||
]
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
933
superxas_bec/devices/timepix/timepix.py
Normal file
933
superxas_bec/devices/timepix/timepix.py
Normal file
@@ -0,0 +1,933 @@
|
||||
"""
|
||||
TimePix Detector class for interfacing with the TimePix detector. The timepix_signals module
|
||||
implements the HTTP communication to the REST API for the tpx3app app. The implementation
|
||||
of the backend is stored in the timepix_fly_client module. This is combined with the control
|
||||
interface in EPICS, which is implemented via the 'ASItpxCam' class.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import ADBase
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
|
||||
from ophyd_devices import (
|
||||
AsyncSignal,
|
||||
CompareStatus,
|
||||
DeviceStatus,
|
||||
FileEventSignal,
|
||||
PreviewSignal,
|
||||
StatusBase,
|
||||
TransitionStatus,
|
||||
)
|
||||
from ophyd_devices.devices.areadetector.cam import ASItpxCam
|
||||
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35, ImagePlugin_V35
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from typeguard import typechecked
|
||||
|
||||
import superxas_bec.devices.timepix.default_pixel_maps as _default_pixel_maps
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
OtherConfigModel,
|
||||
PixelMap,
|
||||
)
|
||||
from superxas_bec.devices.timepix.utils import AndStatusWithList
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
|
||||
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
class TDCEdge(int, enum.Enum):
|
||||
"""TDC Edge enum options for TimePix detector."""
|
||||
|
||||
RISING = 0
|
||||
FALLING = 1
|
||||
BOTH = 2
|
||||
|
||||
|
||||
class TDCOuput(int, enum.Enum):
|
||||
"""TDC Output enum options for TimePix detector."""
|
||||
|
||||
ALL_CHANNELS = 0
|
||||
CHANNEL_0 = 1
|
||||
CHANNEL_1 = 2
|
||||
CHANNEL_2 = 3
|
||||
CHANNEL_3 = 4
|
||||
|
||||
|
||||
class ACQUIRESTATUS(int, enum.Enum):
|
||||
"""Acquire status enum options for TimePix detector."""
|
||||
|
||||
DONE = 0
|
||||
ACQUIRING = 1 # or CAPTURING
|
||||
|
||||
|
||||
class DETECTORSTATE(int, enum.Enum):
|
||||
"""Detector state enum options for TimePix detector."""
|
||||
|
||||
IDLE = 0
|
||||
ACQUIRE = 1
|
||||
READOUT = 2
|
||||
CORRECT = 3
|
||||
SAVING = 4
|
||||
ABORTING = 5
|
||||
ERROR = 6
|
||||
WAITING = 7
|
||||
INITIALIZING = 8
|
||||
DISCONNECTED = 9
|
||||
ABORTED = 10
|
||||
|
||||
|
||||
class TRIGGERMODE(int, enum.Enum):
|
||||
"""Trigger mode enum options for TimePix detector."""
|
||||
|
||||
INTERNAL = 0
|
||||
EXTERNAL = 1
|
||||
SOFTWARE = 2
|
||||
|
||||
|
||||
class TRIGGERSOURCE(int, enum.Enum):
|
||||
"""Trigger source enum options for TimePix detector."""
|
||||
|
||||
HDMI1_1 = 0
|
||||
HDMI1_2 = 1
|
||||
HDMI1_3 = 2
|
||||
HDMI2_1 = 3
|
||||
HDMI2_2 = 4
|
||||
HDMI2_3 = 5
|
||||
|
||||
|
||||
class EXPOSUREMODE(int, enum.Enum):
|
||||
"""Exposure mode enum options for TimePix detector."""
|
||||
|
||||
TIMED = 0
|
||||
TRIGGER_WIDTH = 1
|
||||
|
||||
|
||||
class DATASOURCE(int, enum.Enum):
|
||||
"""Data source for AD Epics backend for Timepix."""
|
||||
|
||||
NONE = 0
|
||||
PREVIEW = 1
|
||||
IMAGE = 2
|
||||
|
||||
|
||||
class FILEWRITEMODE(int, enum.Enum):
|
||||
"""HDF5 Plugin FileWrite Mode"""
|
||||
|
||||
SINGLE = 0
|
||||
CAPTURE = 1
|
||||
STREAM = 2
|
||||
|
||||
|
||||
def load_pixel_map_from_json(file_path: str) -> PixelMap:
|
||||
"""Load a pixel map from a JSON file.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the JSON file containing the pixel map.
|
||||
|
||||
Returns:
|
||||
PixelMap: The loaded pixel map.
|
||||
"""
|
||||
# Check if path exists
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"Pixel map file not found: {file_path}")
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
pixel_map_str = file.read()
|
||||
pixel_map = PixelMap.model_validate_json(pixel_map_str)
|
||||
except Exception as exc:
|
||||
raise ValueError(f"Failed to load pixel map from {file_path}: {exc}") from exc
|
||||
return pixel_map
|
||||
|
||||
|
||||
class ImagePlugin_Timepix(ImagePlugin_V35):
|
||||
"""Custom Image Plugin for TimePix detector."""
|
||||
|
||||
unique_id = Cpt(EpicsSignalRO, "UniqueId_RBV", auto_monitor=True)
|
||||
|
||||
|
||||
class HDF5Plugin_Timepix(HDF5Plugin_V35):
|
||||
"""Custom HDF5 Plugin for TimePix detector."""
|
||||
|
||||
capture = Cpt(EpicsSignalWithRBV, "Capture", auto_monitor=True)
|
||||
write_file = Cpt(EpicsSignalWithRBV, "WriteFile", auto_monitor=True)
|
||||
|
||||
|
||||
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-locals
|
||||
class TimePixControl(ADBase):
|
||||
"""Interface for the TimePix EPICS control of the TimePix detector."""
|
||||
|
||||
cam = Cpt(ASItpxCam, "cam1:")
|
||||
image = Cpt(ImagePlugin_Timepix, "image1:")
|
||||
hdf = Cpt(HDF5Plugin_Timepix, "HDF1:")
|
||||
|
||||
|
||||
DEFAULT_PIXEL_MAP = os.path.join(
|
||||
os.path.dirname(_default_pixel_maps.__file__), "timepix_8_chips_single_energy_per_chip.json"
|
||||
)
|
||||
|
||||
DETECTOR_SHAPE = (512, 1024) # Shape of the TimePix detector
|
||||
|
||||
|
||||
class Timepix(PSIDeviceBase, TimePixControl):
|
||||
"""
|
||||
TimePix class. The IOC is running with the prefix 'X10DA-ES-TPX1:'.
|
||||
The TimePixFly backend is running on p4-0017.psi.ch. Please check the port from the app
|
||||
running in headless server mode. The backend_rest url can for instance be 'p4-0017.psi.ch:8452'.
|
||||
The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch.
|
||||
"""
|
||||
|
||||
MIN_DETECTOR_READOUT_TIME = 2.1e-3 # Minimum readout time in seconds for ASI TimePix detector
|
||||
|
||||
_DETECTOR_SHAPE = DETECTOR_SHAPE
|
||||
USER_ACCESS = [
|
||||
"troin",
|
||||
"troistep",
|
||||
"get_pixel_map",
|
||||
"set_pixel_map",
|
||||
"set_pixel_map_from_json_file",
|
||||
"set_enable_xes",
|
||||
]
|
||||
|
||||
xes_data = Cpt(
|
||||
AsyncSignal,
|
||||
name="xes_data",
|
||||
ndim=2,
|
||||
max_size=1000,
|
||||
doc="Full XES data, 2D image with energypoints vs time bins.",
|
||||
)
|
||||
xes_spectra = Cpt(
|
||||
AsyncSignal,
|
||||
name="xes_spectra",
|
||||
ndim=1,
|
||||
max_size=1000,
|
||||
doc="1D spectra, integrated over energy bins.",
|
||||
)
|
||||
xes_energy_1 = Cpt(
|
||||
AsyncSignal,
|
||||
name="xes_energy_1",
|
||||
ndim=1,
|
||||
max_size=1000,
|
||||
doc="1D time spectra for energy bin 1.",
|
||||
)
|
||||
xes_energy_2 = Cpt(
|
||||
AsyncSignal,
|
||||
name="xes_energy_2",
|
||||
ndim=1,
|
||||
max_size=1000,
|
||||
doc="1D time spectra for energy bin 2.",
|
||||
)
|
||||
tds_period = Cpt(
|
||||
AsyncSignal,
|
||||
name="tds_period",
|
||||
ndim=0,
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
max_size=1000,
|
||||
doc="TDS period recorded by the TimePixFly backend detector.",
|
||||
)
|
||||
total_periods = Cpt(
|
||||
AsyncSignal,
|
||||
name="total_periods",
|
||||
ndim=0,
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
max_size=1000,
|
||||
doc="Total TDS periods recorded by the TimePixFly backend detector.",
|
||||
)
|
||||
total_events = Cpt(
|
||||
AsyncSignal,
|
||||
name="total_events",
|
||||
ndim=0,
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
max_size=1000,
|
||||
doc="Total events recorded by the TimePixFly backend detector.",
|
||||
)
|
||||
|
||||
preview = Cpt(
|
||||
PreviewSignal,
|
||||
name="preview",
|
||||
ndim=2,
|
||||
num_rotation_90=1,
|
||||
doc="Preview signal of the TimePix detector.",
|
||||
)
|
||||
|
||||
static_spectra = Cpt(
|
||||
AsyncSignal,
|
||||
name="static_spectra",
|
||||
ndim=1,
|
||||
max_size=1000,
|
||||
acquisition_group="monitored",
|
||||
async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]},
|
||||
doc="Spectra signal of the TimePix detector.",
|
||||
)
|
||||
|
||||
xes_data_accumulated_1 = Cpt(
|
||||
AsyncSignal,
|
||||
name="xes_accumulated_energy_1",
|
||||
ndim=1,
|
||||
max_size=1000,
|
||||
doc="1D time spectra for energy bin 2.",
|
||||
)
|
||||
xes_data_accumulated_2 = Cpt(
|
||||
AsyncSignal,
|
||||
name="xes_accumulated_energy_2",
|
||||
ndim=1,
|
||||
max_size=1000,
|
||||
doc="1D time spectra for energy bin 2.",
|
||||
)
|
||||
file_event = Cpt(
|
||||
FileEventSignal, name="file_event", doc="File event signal for TimePix detector."
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name,
|
||||
prefix: str,
|
||||
backend_rest_url: str,
|
||||
hostname: str | None = None,
|
||||
socket_port: int = 0,
|
||||
enable_xes: bool = True,
|
||||
scan_info=None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Initialize the Timepix detector.
|
||||
|
||||
Args:
|
||||
name (str): Name of the device.
|
||||
prefix (str): EPICS prefix for the device.
|
||||
backend_rest_url (str): URL of the TimePixFly backend REST API.
|
||||
hostname (str | None): Hostname of the machine running the backend. Defaults to None
|
||||
which will use the current machine's hostname.
|
||||
socket_port (int): Port for the socket connection to the backend. Defaults to 0
|
||||
which will use the default port from the backend.
|
||||
enable_xes (bool): Whether to enable XES data acquisition with TimePixFly backend is active. Defaults to True.
|
||||
scan_info: Scan information object, if available.
|
||||
device_manager: Device manager instance, if available.
|
||||
**kwargs: Additional keyword arguments for the base class.
|
||||
"""
|
||||
|
||||
self.backend = TimepixFlyBackend(
|
||||
backend_rest_url=backend_rest_url, hostname=hostname, socket_port=socket_port
|
||||
)
|
||||
self._pixel_map = None
|
||||
self._troistep = 1
|
||||
self._troin = 5000
|
||||
super().__init__(
|
||||
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
|
||||
)
|
||||
self._poll_thread = threading.Thread(
|
||||
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
|
||||
)
|
||||
self._poll_thread_kill_event = threading.Event()
|
||||
# Image poll rate for preview updates in Hz (max 5 Hz to limit throughput)
|
||||
self._poll_rate = 10
|
||||
self._enable_xes = enable_xes
|
||||
self._full_path = ""
|
||||
self._n_images = 0
|
||||
self._unique_array_id = 0
|
||||
self._pv_timeout = 5
|
||||
self._readout_time = self.MIN_DETECTOR_READOUT_TIME
|
||||
self.r_lock = threading.RLock() # Lock to access the message buffer safely
|
||||
|
||||
self.accumulated_data_e1 = None
|
||||
self.accumulated_data_e2 = None
|
||||
|
||||
def stage(self) -> list[object] | StatusBase: # type: ignore
|
||||
"""Stage the device.
|
||||
|
||||
Super stage not safe to call.."""
|
||||
self.stopped = False
|
||||
status = self.on_stage() # pylint: disable=assignment-from-no-return
|
||||
if isinstance(status, StatusBase):
|
||||
return status
|
||||
return []
|
||||
|
||||
def _poll_array_data(self):
|
||||
"""Poll the array data for preview updates."""
|
||||
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
|
||||
try:
|
||||
# First check if there is a new image
|
||||
if self.image.unique_id.get() != self._unique_array_id:
|
||||
self._unique_array_id = self.image.unique_id.get()
|
||||
else:
|
||||
continue # No new image, skip update
|
||||
# Get new image data
|
||||
value = self.image.array_data.get()
|
||||
if value is None:
|
||||
logger.info(f"No image data available for preview of {self.name}")
|
||||
continue
|
||||
|
||||
width = self.image.array_size.width.get()
|
||||
height = self.image.array_size.height.get()
|
||||
# Geometry correction for the image
|
||||
data = np.reshape(value, (height, width))
|
||||
logger.info(f"Setting preview data for {self.name} with shape {data.shape}")
|
||||
self.preview.put(data)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Error while polling array data for preview of {self.name}: {content}"
|
||||
)
|
||||
|
||||
###
|
||||
def msg_buffer_callback(
|
||||
self,
|
||||
start_frame: dict[
|
||||
Literal[
|
||||
"type", "Mode", "TRoiStart", "TRoiStep", "TRoiN", "NumEnergyPoints", "save_interval"
|
||||
],
|
||||
Any,
|
||||
],
|
||||
data_frames: list[
|
||||
dict[
|
||||
Literal["type", "period", "totalEvents", "TDSpectra", "beforeROI", "afterROI"], Any
|
||||
]
|
||||
],
|
||||
end_frame: dict[Literal["type", "error"], Any],
|
||||
):
|
||||
"""
|
||||
Callback method to be attached to the backend to process the message buffer. The callback expects
|
||||
start_frame, data_frames, and end_frame as arguments. Additionally, one may pass extra kwargs that
|
||||
will be passed to the callback function.
|
||||
|
||||
Args:
|
||||
start_frame (dict): The StartFrame. Dictionary representation of detailed structure
|
||||
described in model .timepix_fly_client.timepix_fly_interface.TimepixStartFrame
|
||||
data_frames (list): List of XesData frames. Dictionary of structures described in
|
||||
model .timepix_fly_client.timepix_fly_interface.TimepixDataFrame
|
||||
end_frame (dict): The EndFrame. Dictionary representation of detailed structure
|
||||
described in model .timepix_fly_client.timepix_fly_interface.TimepixEndFrame
|
||||
"""
|
||||
n_energy_points = start_frame.get("NumEnergyPoints", None)
|
||||
troin = start_frame["TRoiN"]
|
||||
if troin != self._troin:
|
||||
logger.error(f"Number of pixels {troin} does not match expected {self._troin}.")
|
||||
|
||||
# Create return data
|
||||
xes_data = np.zeros((n_energy_points, troin), dtype=np.float32) # dtype from backend code
|
||||
tds_period = []
|
||||
tds_total_events = 0
|
||||
total_periods = 0
|
||||
data_frame_freq = 131000 / start_frame.get("save_interval", 1) # in Hz
|
||||
logger.info(
|
||||
f"Processing TimepixFly data: start_frame: {start_frame}, end_frame: {end_frame}"
|
||||
)
|
||||
if len(data_frames) == 0:
|
||||
logger.error(
|
||||
f"No data frames received in msg_buffer; for start_frame: {start_frame}, end_frame: {end_frame}"
|
||||
)
|
||||
# TODO this should no longer happen now as this was fixed in the backend..
|
||||
else:
|
||||
for msg in data_frames:
|
||||
tds_period.append(msg["period"])
|
||||
tds_total_events += msg["totalEvents"]
|
||||
for ii in range(n_energy_points):
|
||||
xes_data[ii, :] += msg["TDSpectra"][ii::n_energy_points]
|
||||
|
||||
tds_period = (
|
||||
np.array(tds_period) / start_frame.get("save_interval", 1) / data_frame_freq
|
||||
)
|
||||
|
||||
total_periods = end_frame.get("periods", None)
|
||||
if total_periods is not None:
|
||||
self.total_periods.put(
|
||||
total_periods / start_frame.get("save_interval", 1) / data_frame_freq
|
||||
)
|
||||
else:
|
||||
logger.error(f"Received total_periods: {total_periods} from end_frame {end_frame}.")
|
||||
|
||||
# Transpose to get shape (troin, n_energy_points)
|
||||
xes_data = xes_data.T
|
||||
|
||||
# Put XES data
|
||||
self.tds_period.put(tds_period)
|
||||
self.total_events.put(tds_total_events)
|
||||
self.xes_data.put(
|
||||
xes_data, async_update={"type": "add", "max_shape": [None, troin, n_energy_points]}
|
||||
)
|
||||
if n_energy_points == 8:
|
||||
data_1 = np.sum(xes_data[:, 0:4], axis=1)
|
||||
data_2 = np.sum(xes_data[:, 4:8], axis=1)
|
||||
self.xes_energy_1.put(data_1, async_update={"type": "add", "max_shape": [None, troin]})
|
||||
self.xes_energy_2.put(data_2, async_update={"type": "add", "max_shape": [None, troin]})
|
||||
if self.accumulated_data_e1 is None:
|
||||
self.accumulated_data_e1 = data_1
|
||||
else:
|
||||
self.accumulated_data_e1 += data_1
|
||||
if self.accumulated_data_e2 is None:
|
||||
self.accumulated_data_e2 = data_2
|
||||
else:
|
||||
self.accumulated_data_e2 += data_2
|
||||
self.xes_data_accumulated_1.put(
|
||||
self.accumulated_data_e1,
|
||||
async_update={"type": "replace", "max_shape": [None, troin]},
|
||||
)
|
||||
self.xes_data_accumulated_2.put(
|
||||
self.accumulated_data_e2,
|
||||
async_update={"type": "replace", "max_shape": [None, troin]},
|
||||
)
|
||||
self.xes_spectra.put(
|
||||
xes_data.sum(axis=1), async_update={"type": "add", "max_shape": [None, troin]}
|
||||
)
|
||||
logger.debug(f"Device data set for Timepix with {tds_period}, {tds_total_events}")
|
||||
|
||||
### User ACCESS methods
|
||||
|
||||
def get_pixel_map(self) -> dict:
|
||||
"""Get the current pixel map as a dictionary."""
|
||||
return self._pixel_map.model_dump()
|
||||
|
||||
def set_pixel_map(self, pixel_map: dict) -> None:
|
||||
"""Set the pixel map from a dictionary."""
|
||||
self._pixel_map = PixelMap.model_validate(pixel_map)
|
||||
|
||||
def set_pixel_map_from_json_file(self, file_path: str) -> None:
|
||||
"""Set the pixel map from a JSON file.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the JSON file containing the pixel map.
|
||||
"""
|
||||
pixel_map = load_pixel_map_from_json(file_path)
|
||||
self._pixel_map = pixel_map
|
||||
|
||||
def set_enable_xes(self, enable: bool) -> None:
|
||||
"""Enable or disable XES data acquisition.
|
||||
|
||||
Args:
|
||||
enable (bool): Whether to enable XES data acquisition.
|
||||
"""
|
||||
self.enable_xes = enable
|
||||
|
||||
@property
|
||||
def enable_xes(self) -> bool:
|
||||
"""Get whether XES data acquisition is enabled."""
|
||||
return self._enable_xes
|
||||
|
||||
@enable_xes.setter
|
||||
@typechecked
|
||||
def enable_xes(self, value: bool):
|
||||
"""Set whether XES data acquisition is enabled."""
|
||||
self._enable_xes = value
|
||||
self._enable_xes_settings(value)
|
||||
# #TODO Update device manager config if available
|
||||
# if self.device_manager is not None:
|
||||
# dev_obj = self.device_manager.devices.get(self.name, None)
|
||||
# if dev_obj is not None:
|
||||
# cfg = dev_obj.get_device_config()
|
||||
# if "enable_xes" in cfg and cfg["enable_xes"] != value:
|
||||
# cfg["enable_xes"] = value
|
||||
# dev_obj.set_device_config({"enable_xes": value})
|
||||
# logger.info(
|
||||
# f"Updated 'enable_xes' to {value} in device manager for {self.name}"
|
||||
# )
|
||||
|
||||
@property
|
||||
def pixel_map(self) -> PixelMap:
|
||||
"""Get the current pixel map of the TimePix detector."""
|
||||
if self._pixel_map is None:
|
||||
try:
|
||||
pixel_map = load_pixel_map_from_json(DEFAULT_PIXEL_MAP)
|
||||
self._pixel_map = pixel_map
|
||||
# pylint: disable=broad-except
|
||||
# pylint: disable=raise-missing-from
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Failed to load default pixel map: {content}")
|
||||
raise ValueError(
|
||||
f"Failed to load default pixel map from {DEFAULT_PIXEL_MAP}: {content}"
|
||||
)
|
||||
return self._pixel_map
|
||||
|
||||
@pixel_map.setter
|
||||
@typechecked
|
||||
def pixel_map(self, value: PixelMap):
|
||||
self._pixel_map = value
|
||||
|
||||
@property
|
||||
def troistep(self) -> int:
|
||||
"""Get the current ROI step size."""
|
||||
return self._troistep
|
||||
|
||||
@troistep.setter
|
||||
@typechecked
|
||||
def troistep(self, value: int):
|
||||
"""Set the ROI step size."""
|
||||
if value <= 0:
|
||||
raise ValueError("ROI step size must be a positive integer.")
|
||||
self._troistep = value
|
||||
|
||||
@property
|
||||
def troin(self) -> int:
|
||||
"""Get the current ROI number of pixels."""
|
||||
return self._troin
|
||||
|
||||
@troin.setter
|
||||
@typechecked
|
||||
def troin(self, value: int):
|
||||
"""Set the ROI number of pixels."""
|
||||
if value <= 0:
|
||||
raise ValueError("ROI number of pixels must be a positive integer.")
|
||||
self._troin = value
|
||||
|
||||
######################################################################
|
||||
### Beamline specific methods for the TimePix Detector integration ###
|
||||
######################################################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
start_time = time.time()
|
||||
logger.info(f"Loading default pixel map for TimePix detector {self.name}...")
|
||||
self.set_pixel_map_from_json_file(DEFAULT_PIXEL_MAP)
|
||||
logger.info(
|
||||
f"Default pixel map for TimePix detector {self.name} loaded after {time.time() - start_time:.3f} seconds."
|
||||
)
|
||||
|
||||
def _enable_xes_settings(self, enabled: bool) -> None:
|
||||
"""Enable XES specific settings for the TimePix detector."""
|
||||
enabled_value = 1 if enabled else 0
|
||||
self.cam.tdc1_enable.set(enabled_value).wait(timeout=self._pv_timeout)
|
||||
self.cam.tdc2_enable.set(enabled_value).wait(timeout=self._pv_timeout)
|
||||
self.cam.raw_enable.set(enabled_value).wait(timeout=self._pv_timeout)
|
||||
if enabled:
|
||||
self.cam.tdc1_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
|
||||
self.cam.tdc1_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
|
||||
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
|
||||
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
start_time = time.time()
|
||||
logger.info(f"On Connected of TimePix detector {self.name}...")
|
||||
# Prepare TimePix Detector
|
||||
self._enable_xes_settings(self.enable_xes)
|
||||
self.cam.trigger_mode.set(TRIGGERMODE.INTERNAL).wait(timeout=self._pv_timeout)
|
||||
self.cam.trigger_source.set(TRIGGERSOURCE.HDMI1_1).wait(timeout=self._pv_timeout)
|
||||
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
|
||||
# Reset array counter on connect
|
||||
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
|
||||
|
||||
# ------------------
|
||||
# Prepare file writing through AD HDF5 plugin
|
||||
# -----------------
|
||||
self.hdf.enable.set(1).wait(timeout=self._pv_timeout)
|
||||
self.hdf.file_write_mode.set(FILEWRITEMODE.STREAM.value).wait(timeout=self._pv_timeout)
|
||||
self.hdf.auto_save.set(1).wait(timeout=self._pv_timeout)
|
||||
self.hdf.file_template.set("%s%s").wait(timeout=self._pv_timeout)
|
||||
self.hdf.lazy_open.set(1).wait(timeout=self._pv_timeout)
|
||||
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
|
||||
|
||||
# ------------------
|
||||
# Prepare TimePixFly backend
|
||||
# -----------------
|
||||
|
||||
# Prepare backend for TimePixFly
|
||||
self.backend.on_connected()
|
||||
# Register the callback for processing data received by the backend
|
||||
self.backend.add_callback(self.msg_buffer_callback)
|
||||
self._poll_thread.start()
|
||||
logger.info(
|
||||
f"TimePix detector {self.name} connected and initialized after {time.time() - start_time:.3f} seconds."
|
||||
)
|
||||
# Subscribe to new image updates
|
||||
self.image.unique_id.subscribe(self._on_new_image_received)
|
||||
|
||||
def _on_new_image_received(self, value: int, old_value: int, **kwargs):
|
||||
"""Callback for image unique ID updates to trigger preview update."""
|
||||
if value == old_value:
|
||||
return # No new image, or counter reset
|
||||
try:
|
||||
# Get new image data
|
||||
width = self.image.array_size.width.get()
|
||||
height = self.image.array_size.height.get()
|
||||
array_data = self.image.array_data.get()
|
||||
if array_data is None:
|
||||
logger.info(f"No image data available for preview of {self.name}")
|
||||
return
|
||||
|
||||
# Geometry correction for the image
|
||||
data = np.sum(np.reshape(array_data, (height, width)), axis=1)
|
||||
self.static_spectra.put(data)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error while updating preview for {self.name} on image update: {content}")
|
||||
|
||||
def on_stage(self) -> StatusBase | None:
|
||||
"""Called while staging the device."""
|
||||
self.accumulated_data_e1 = None
|
||||
self.accumulated_data_e2 = None
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg # type: ignore
|
||||
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
|
||||
if exp_time - self._readout_time <= 0:
|
||||
raise ValueError(
|
||||
f"Exposure time {exp_time} must be greater than readout time {self._readout_time}."
|
||||
)
|
||||
burst_images = scan_msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
self._n_images = scan_msg.num_points * burst_images
|
||||
# Camera has to be set to burst_images, each step will get an individual trigger
|
||||
self.cam.acquire_time.set(exp_time - self._readout_time).wait(timeout=self._pv_timeout)
|
||||
self.cam.acquire_period.set(exp_time).wait(timeout=self._pv_timeout)
|
||||
self.cam.num_images.set(burst_images).wait(timeout=self._pv_timeout)
|
||||
self.cam.data_source.set(DATASOURCE.IMAGE).wait(timeout=self._pv_timeout)
|
||||
|
||||
# Setup file writing
|
||||
self._full_path = get_full_path(scan_msg, name="timepix")
|
||||
file_path = "/".join(self._full_path.split("/")[:-1])
|
||||
file_name = self._full_path.split("/")[-1]
|
||||
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
|
||||
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
|
||||
self.hdf.file_path.set(file_path).wait(5)
|
||||
self.hdf.file_name.set(file_name).wait(5)
|
||||
# Setup file writing for the total expected number of images
|
||||
self.hdf.num_capture.set(self._n_images).wait(5)
|
||||
self.hdf.capture.put(1)
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=False,
|
||||
successful=False,
|
||||
hinted_h5_entries={"data": "/entry/data/data"},
|
||||
)
|
||||
|
||||
# -------------------------
|
||||
# XES specific staging
|
||||
if self.enable_xes:
|
||||
# Prepare TimePixFly
|
||||
other_config = OtherConfigModel(
|
||||
TRoiStep=self.troistep,
|
||||
TRoiN=self.troin,
|
||||
output_uri=f"tcp:{self.backend.hostname}:{self.backend.socket_port}",
|
||||
save_interval=int(131000 / 5) - 5, # Save interval in 131kHz units,
|
||||
)
|
||||
logger.debug(f"Current TimePixFly configuration: {other_config}")
|
||||
pixel_map = self.pixel_map
|
||||
self.backend.on_stage(other_config=other_config, pixel_map=pixel_map)
|
||||
|
||||
# Fetch the backend socket info
|
||||
net_add = self.backend.timepix_fly_client.get_net_addresses()
|
||||
logger.debug(f"Using net_add for timepix_fly backend {net_add}")
|
||||
self.cam.raw_file_template.set("").wait(timeout=self._pv_timeout)
|
||||
self.cam.raw_file_path.set(f"tcp://connect@{net_add.address}").wait(
|
||||
timeout=self._pv_timeout
|
||||
)
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""Called while unstaging the device."""
|
||||
# TODO what should happen for unstage? Make sure that acquisition is not running?
|
||||
# self.backend.on_unstage()
|
||||
# self.cam.acquire.put(0)
|
||||
# status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
|
||||
def on_pre_scan(self) -> StatusBase:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
status_camera = CompareStatus(
|
||||
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
|
||||
)
|
||||
status_writer = CompareStatus(
|
||||
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
|
||||
)
|
||||
status = status_camera & status_writer
|
||||
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
status_camera = TransitionStatus(
|
||||
self.cam.acquire_busy, [ACQUIRESTATUS.DONE, ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE]
|
||||
)
|
||||
img_counter = self.hdf.num_captured.get()
|
||||
status_backend = None
|
||||
# First we make sure that the backend reach 'config' state. This needs to happend before each trigger.
|
||||
if self.enable_xes is True:
|
||||
status_backend_config = DeviceStatus(self)
|
||||
self.cancel_on_stop(status_backend_config)
|
||||
self.backend.timepix_fly_client.add_status_callback(
|
||||
status=status_backend_config,
|
||||
success=[TimePixFlyStatus.CONFIG],
|
||||
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
|
||||
)
|
||||
try:
|
||||
status_backend_config.wait(timeout=5)
|
||||
except TimeoutError:
|
||||
# pylint:disable=raise-missing-from
|
||||
raise TimeoutError(
|
||||
f"TimePixFly backend of device {self.name} failed to reach 'config' state in trigger"
|
||||
)
|
||||
|
||||
# Prepare backend to be ready to receive trigger
|
||||
status = self.backend.on_trigger()
|
||||
try:
|
||||
status.wait(timeout=5) # Wait until backend trigger is done
|
||||
# pylint:disable=raise-missing-from
|
||||
except Exception:
|
||||
logger.error(
|
||||
f"TimePixFly backend of device {self.name} failed to prepare for trigger"
|
||||
)
|
||||
raise TimeoutError(
|
||||
f"TimePixFly backend of device {self.name} failed to reach 'trigger' state in trigger"
|
||||
)
|
||||
# Status that resolves once the trigger is done
|
||||
status_backend = self.backend.on_trigger_finished()
|
||||
|
||||
if status_backend is not None:
|
||||
return_status = status_camera & status_backend
|
||||
else:
|
||||
return_status = status_camera
|
||||
self.cancel_on_stop(return_status)
|
||||
self.cam.acquire.put(1)
|
||||
return return_status
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
# Status Camera
|
||||
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
# Status Writer
|
||||
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
|
||||
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
|
||||
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
|
||||
status_writer = st1 & st2 & status_written_images
|
||||
|
||||
# Status Backend
|
||||
status_backend = None
|
||||
if self.enable_xes is True:
|
||||
# Add callback to the backend complete handling
|
||||
status_backend = self.backend.on_complete(status=status_backend)
|
||||
# Combine the statuses
|
||||
if status_backend is not None:
|
||||
return_status = status_backend & status_camera & status_writer
|
||||
else:
|
||||
return_status = status_camera & status_writer
|
||||
|
||||
return_status.add_callback(self._complete_callback)
|
||||
self.cancel_on_stop(return_status)
|
||||
return return_status
|
||||
|
||||
def _complete_callback(self, status: CompareStatus) -> None:
|
||||
"""Callback for when the device completes a scan."""
|
||||
if status.success:
|
||||
self.file_event.put(
|
||||
file_path=self._full_path, # pylint: disable:protected-access
|
||||
done=True,
|
||||
successful=True,
|
||||
hinted_h5_entries={"data": "/entry/data/data"},
|
||||
)
|
||||
else:
|
||||
self.file_event.put(
|
||||
file_path=self._full_path, # pylint: disable:protected-access
|
||||
done=True,
|
||||
successful=False,
|
||||
hinted_h5_entries={"data": "/entry/data/data"},
|
||||
)
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
# Camera
|
||||
self.cam.acquire.put(0)
|
||||
self.hdf.capture.put(0)
|
||||
# Backend
|
||||
if self.enable_xes is True:
|
||||
self.backend.on_stop()
|
||||
|
||||
def on_destroy(self):
|
||||
"""Cleanup method to stop the device and clean up resources."""
|
||||
self.cam.acquire.put(0)
|
||||
self.hdf.capture.put(0)
|
||||
self._poll_thread_kill_event.set()
|
||||
self.backend.on_stop()
|
||||
self.backend.on_destroy()
|
||||
|
||||
|
||||
# pylint: disable=protected-access
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
|
||||
timepix = Timepix(
|
||||
name="timepix",
|
||||
prefix="X10DA-ES-TPX1:",
|
||||
backend_rest_url="P6-0008.psi.ch:8452", # "P4-0017.psi.ch:8452",
|
||||
hostname="x10da-bec-001.psi.ch",
|
||||
)
|
||||
try:
|
||||
# timepix.wait_for_connection(all_signals=True, timeout=10)
|
||||
timepix.on_connected()
|
||||
print("Timepix connected and initialized.")
|
||||
for exp_time, frames_per_trigger, runs in zip([0.1, 1, 0.2], [20, 5, 1], [10, 5, 30]):
|
||||
|
||||
print(
|
||||
f"Sleeping for 0.5 seconds before starting the scan with exp_time={exp_time} "
|
||||
f"and frames_per_trigger={frames_per_trigger}. and runs {runs}"
|
||||
)
|
||||
time.sleep(0.5)
|
||||
timepix.scan_info.msg.scan_parameters.update(
|
||||
{
|
||||
"exp_time": exp_time, # Set exposure time to 1 second for testing
|
||||
"frames_per_trigger": frames_per_trigger, # Set frames per trigger to 5 for testing
|
||||
}
|
||||
)
|
||||
timepix.stage()
|
||||
logger.warning(f"Timepix on stage done")
|
||||
timepix.pre_scan()
|
||||
logger.warning(f"Timepix on pre_scan done")
|
||||
|
||||
msgs = []
|
||||
# for ii in range(runs):
|
||||
for run in range(runs):
|
||||
logger.warning(f"Starting trigger run {run + 1}/{runs}")
|
||||
status = timepix.trigger()
|
||||
logger.warning(f"Timepix triggered")
|
||||
start_time = time.time()
|
||||
while not status.done:
|
||||
try:
|
||||
status.wait(timeout=1)
|
||||
except Exception as exc:
|
||||
logger.warning(f"Trigger not done after ({time.time() - start_time:.2f}s)")
|
||||
if time.time() - start_time > 20:
|
||||
logger.warning("Breaking loop manually after 20 seconds of waiting.")
|
||||
status.set_exception(f"Failed to complete trigger after 20 seconds")
|
||||
break
|
||||
if hasattr(timepix, "_msg_dump"):
|
||||
n_messages = len(timepix._msg_dump)
|
||||
logger.warning(f"Messages in Buffer is {n_messages}")
|
||||
if n_messages > 0:
|
||||
msg = timepix._msg_dump[-1]
|
||||
logger.warning(
|
||||
f"Last message had N start_frame : {msg.get('start_frame')}, N data_frames: {len(msg.get('data_frame'))}, N end_frame : {msg.get('end_frame')}"
|
||||
)
|
||||
status = timepix.complete()
|
||||
print("Waiting for timepix to complete.")
|
||||
status.wait(timeout=10)
|
||||
print("Timepix scan completed.")
|
||||
timepix.unstage()
|
||||
# timepix._msg_dump.clear()
|
||||
print("Timepix unstaged.")
|
||||
except Exception as e:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"An error occurred: {content}")
|
||||
finally:
|
||||
timepix.destroy()
|
||||
print("Timepix destroyed.")
|
||||
@@ -0,0 +1 @@
|
||||
from .timepix_fly_backend import TimepixFlyBackend
|
||||
@@ -0,0 +1 @@
|
||||
from .timepix_fly_mock_server import TimePixFlyMockServer
|
||||
@@ -0,0 +1,49 @@
|
||||
"""Module to control the Timepix Fly mock server."""
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class TimePixFlyMockServer:
|
||||
"""
|
||||
A mock server for the Timepix Fly detector that simulates the behavior of the actual server.
|
||||
This is used for testing purposes and does not require a real Timepix Fly detector.
|
||||
"""
|
||||
|
||||
def __init__(self, host: str = "localhost", port: int = 8080, logger=None):
|
||||
"""
|
||||
Initialize the TimePixFlyMockServer with a host and port.
|
||||
|
||||
Args:
|
||||
host (str): The host address for the mock server. Default is "localhost".
|
||||
port (int): The port number for the mock server. Default is 8080.
|
||||
logger: An optional logger to log messages. If not provided, messages will be printed to the console.
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.logger = logger
|
||||
|
||||
def add_log(self, message: str) -> None:
|
||||
"""
|
||||
Add a log message to the logger if available.
|
||||
If no logger is provided, it will print the message to the console.
|
||||
|
||||
Args:
|
||||
message (str): The message to log.
|
||||
"""
|
||||
if self.logger is not None:
|
||||
self.logger.info(message)
|
||||
else:
|
||||
print(message)
|
||||
|
||||
def start_acquisition(self):
|
||||
"""
|
||||
Simulate starting an acquisition on the Timepix Fly detector.
|
||||
This method does not perform any real acquisition but simulates the behavior.
|
||||
"""
|
||||
try:
|
||||
requests.get(f"http://{self.host}:{self.port}/measurement/start", timeout=0.2)
|
||||
except requests.exceptions.RequestException:
|
||||
self.add_log("Failed to start acquisition on Timepix Fly mock server.")
|
||||
# Ignore all exceptions as there is currently no return value for the request
|
||||
else:
|
||||
self.add_log("Acquisition started on Timepix Fly mock server.")
|
||||
@@ -0,0 +1,559 @@
|
||||
"""
|
||||
Implementation of the Timepix Fly Backend. It handles the communication
|
||||
with the TimepixFly backend (https://github.com/paulscherrerinstitute/TimePixFly).
|
||||
Please be aware that this was developed agains the 'dev' branch (2025/08/15).
|
||||
|
||||
It communicates with the backend through a simple Client (TimepixFlyClient)
|
||||
that handles the REST and WebSocket communication + callbacks, and provides
|
||||
hooks for all the relevant ophyd interface, 'on_stage',
|
||||
'on_trigger', 'on_complete', 'on_stop', ...
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import signal
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Callable, Tuple
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd_devices import StatusBase
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
|
||||
TimepixFlyClient,
|
||||
TimePixFlyStatus,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
OtherConfigModel,
|
||||
PixelMap,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ophyd import DeviceStatus
|
||||
|
||||
from superxas_bec.devices.timepix.timepix import Timepix
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
class TimepixFlyBackendException(Exception):
|
||||
"""Custom exception for Timepix Fly Backend errors."""
|
||||
|
||||
|
||||
class TimepixFlyBackend:
|
||||
"""Timepix Fly Backend Device."""
|
||||
|
||||
def __init__(self, backend_rest_url: str, hostname: str | None = None, socket_port: int = 0):
|
||||
"""
|
||||
Initialize the Timepix Fly Backend device.
|
||||
|
||||
Parameters:
|
||||
backend_rest_url: The REST URL of the backend.
|
||||
hostname: The hostname of the device, defaults to None, which means
|
||||
socket.getfqdn() will be used to fetch hostname. It is recommended to specify
|
||||
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch' for use
|
||||
at the beamline computers of SLS, or localhost for local testing of the backend.
|
||||
socket_port: The socket port to use. Defaults to 0,
|
||||
which lets the OS choose an available port.
|
||||
"""
|
||||
ws_url = f"{backend_rest_url}/ws"
|
||||
self.timepix_fly_client = TimepixFlyClient(rest_url=backend_rest_url, ws_url=ws_url)
|
||||
if hostname is None:
|
||||
hostname = socket.getfqdn()
|
||||
self.hostname = hostname
|
||||
self.socket_port = socket_port # Use 0 as default to let the OS choose an available port
|
||||
self.__msg_buffer = []
|
||||
self.callbacks: dict[str, Tuple[Callable[[dict, list[dict], dict, dict], None], dict]] = {}
|
||||
self._status_objects: list[StatusBase] = []
|
||||
self._decoder = json.JSONDecoder()
|
||||
self._socket_server: socket.socket | None = None
|
||||
self._data_thread: threading.Thread | None = None
|
||||
self._data_thread_shutdown_event = threading.Event()
|
||||
|
||||
###################################################
|
||||
###### Hooks for the PSIDeviceBase interface ######
|
||||
###################################################
|
||||
|
||||
def on_connected(self):
|
||||
"""Called if it is ensured that the device is connected."""
|
||||
time_started = time.time()
|
||||
logger.info("Connecting to Timepix Fly backend...")
|
||||
try:
|
||||
self.timepix_fly_client.on_connected()
|
||||
status = self.start_data_server()
|
||||
status.wait(timeout=5)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error starting data server: {content}")
|
||||
# pylint: disable=raise-missing-from
|
||||
raise TimepixFlyBackendException(
|
||||
f"Could not start data server on {self.hostname}:{self.socket_port}. Please check logs for more details."
|
||||
)
|
||||
logger.info(
|
||||
f"Timepix Fly backend connected and data server started on {self.hostname}:{self.socket_port} after {time.time() - time_started:.3f} seconds."
|
||||
)
|
||||
|
||||
def on_stage(self, other_config: OtherConfigModel, pixel_map: PixelMap):
|
||||
"""
|
||||
Hook for on stage logic.
|
||||
|
||||
Args:
|
||||
other_config (OtherConfigModel): The configuration for the Timepix Fly detector.
|
||||
pixel_map (PixelMap): The pixel map for the Timepix Fly detector.
|
||||
"""
|
||||
time_started = time.time()
|
||||
status = StatusBase()
|
||||
self.cancel_on_stop(status)
|
||||
self.timepix_fly_client.add_status_callback(
|
||||
status,
|
||||
success=[TimePixFlyStatus.CONFIG],
|
||||
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
|
||||
)
|
||||
try:
|
||||
status.wait(timeout=5.0)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Error while waiting for Timepix Fly backend to be in config state: {content}"
|
||||
)
|
||||
# pylint: disable=raise-missing-from
|
||||
|
||||
raise TimeoutError(
|
||||
f"Timepix Fly backend state did not reach config state, running into timeout. Error traceback {content}."
|
||||
)
|
||||
status = StatusBase()
|
||||
|
||||
self.timepix_fly_client.add_status_callback(
|
||||
status,
|
||||
success=[TimePixFlyStatus.CONFIG],
|
||||
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
|
||||
)
|
||||
logger.debug(f"Setting other config, backend {other_config}")
|
||||
self.timepix_fly_client.set_other_config(other_config)
|
||||
self.timepix_fly_client.set_pixel_map(pixel_map)
|
||||
try: # TODO make asynchronous
|
||||
status.wait(timeout=5.0)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Error while waiting for Timepix Fly backend to be in config state after setting config: {content}"
|
||||
)
|
||||
# pylint: disable=raise-missing-from
|
||||
|
||||
raise TimeoutError(
|
||||
f"Timepix Fly backend state did not reach config state after setting config, running into timeout. Error traceback {content}."
|
||||
)
|
||||
logger.info(f"TimePixFly backend staged after {time.time() - time_started:.3f} seconds.")
|
||||
|
||||
def on_trigger(
|
||||
self, status: StatusBase | DeviceStatus | None = None
|
||||
) -> StatusBase | DeviceStatus:
|
||||
"""
|
||||
Hook for on_trigger logic. It adds a status callback based on the TimePixFlyStatus.
|
||||
The backend needs to get into the AWAIT_CONNECTION state before starting the acquisition.
|
||||
|
||||
Args:
|
||||
status (StatusBase | DeviceStatus | None): The status object to track the operation.
|
||||
If None, a new StatusBase object will be created.
|
||||
Returns:
|
||||
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
|
||||
"""
|
||||
# TODO, could be removed as it's checkd from the top level!
|
||||
if status is None:
|
||||
status = StatusBase()
|
||||
self.cancel_on_stop(status)
|
||||
self.timepix_fly_client.add_status_callback(
|
||||
status,
|
||||
success=[TimePixFlyStatus.AWAIT_CONNECTION],
|
||||
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
|
||||
)
|
||||
self.timepix_fly_client.start()
|
||||
return status
|
||||
|
||||
def on_trigger_finished(
|
||||
self, status: StatusBase | DeviceStatus | None = None
|
||||
) -> StatusBase | DeviceStatus:
|
||||
"""
|
||||
Hook for on_trigger_finished logic. It adds a status callback based on the TimePixFlyStatus.
|
||||
The backend needs to get into the CONFIG state again after a trigger is finished.
|
||||
In practice, a full scan logic is happening during on trigger.
|
||||
The status will be marked as finished/successful when the backend state
|
||||
reaches CONFIG. If an exception state is reached, the status will be marked as failed.
|
||||
|
||||
Args:
|
||||
status (StatusBase | DeviceStatus | None): The status object to track the operation.
|
||||
If None, a new StatusBase object will be created.
|
||||
Returns:
|
||||
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
|
||||
"""
|
||||
if status is None:
|
||||
status = StatusBase()
|
||||
self.cancel_on_stop(status)
|
||||
self.timepix_fly_client.add_status_callback(
|
||||
status,
|
||||
success=[TimePixFlyStatus.CONFIG],
|
||||
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
|
||||
)
|
||||
return status
|
||||
|
||||
def on_complete(
|
||||
self, status: StatusBase | DeviceStatus | None = None
|
||||
) -> StatusBase | DeviceStatus:
|
||||
"""
|
||||
Hook for on_complete logic. It adds a status callback based on the TimePixFlyStatus.
|
||||
The backend needs to get into the CONFIG state after a single acquisition.
|
||||
|
||||
Args:
|
||||
status (StatusBase | DeviceStatus | None): The status object to track the operation.
|
||||
If None, a new StatusBase object will be created.
|
||||
Returns:
|
||||
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
|
||||
"""
|
||||
if status is None:
|
||||
status = StatusBase()
|
||||
self.cancel_on_stop(status)
|
||||
self.timepix_fly_client.add_status_callback(
|
||||
status,
|
||||
success=[TimePixFlyStatus.CONFIG],
|
||||
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
|
||||
)
|
||||
return status
|
||||
|
||||
def on_unstage(self) -> StatusBase:
|
||||
"""Hook for on_unstage logic."""
|
||||
# status = StatusBase()
|
||||
# self.cancel_on_stop(status)
|
||||
# self.timepix_fly_client.add_status_callback(
|
||||
# status,
|
||||
# success=[TimePixFlyStatus.CONFIG],
|
||||
# error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
|
||||
# )
|
||||
# return status
|
||||
|
||||
def on_destroy(self):
|
||||
"""Hook for on_destroy logic."""
|
||||
self.timepix_fly_client.shutdown()
|
||||
self._data_thread_shutdown_event.set()
|
||||
if self._data_thread is not None and self._data_thread.is_alive():
|
||||
self._data_thread.join(timeout=1) # Allow the data thread to finish
|
||||
if self._data_thread.is_alive():
|
||||
logger.error(
|
||||
"Data thread poll loop of timepix_fly_backend did not stop within 1 second."
|
||||
)
|
||||
if self._socket_server is not None:
|
||||
try:
|
||||
logger.info(f"Closing socket server on {self.hostname}:{self.socket_port}.")
|
||||
self._socket_server.close()
|
||||
# pylint: disable=broad-except
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error closing socket server: {content}")
|
||||
|
||||
def on_stop(self):
|
||||
"""Hook for on_stop logic."""
|
||||
self.stop_all_status_objects()
|
||||
self.timepix_fly_client.stop_running_collection()
|
||||
|
||||
####################################################
|
||||
########## Custom Methods for the Backend ##########
|
||||
####################################################
|
||||
|
||||
def cancel_on_stop(self, status: StatusBase):
|
||||
"""Cancel ongoing operations of a status object when the sto method is called."""
|
||||
self._status_objects.append(status)
|
||||
|
||||
def stop_all_status_objects(self):
|
||||
"""Stop all status objects that are currently running."""
|
||||
for status in self._status_objects:
|
||||
with status._lock:
|
||||
if not status.done:
|
||||
status.set_exception(
|
||||
RuntimeError("Stop called on device, all status objects cancelled.")
|
||||
)
|
||||
logger.info(f"Cancelled status object: {status}")
|
||||
self._status_objects.clear()
|
||||
|
||||
def add_callback(self, callback: callable, kwd: dict | None = None) -> str:
|
||||
"""
|
||||
Add a callback that will be executed whenever an acquisition is completed. This is
|
||||
determind by receiving an EndFrame message from the backend. There will always be
|
||||
a StartFrame message, follow by optional DataFrame messages, and finally
|
||||
an EndFrame message. The callback will be called with the StartFrame, all DataFrames
|
||||
and the EndFrame message as arguments, along with any additional keyword arguments
|
||||
provided when registering the callback.
|
||||
The callback signature needs to be:
|
||||
def callback(start_frame: dict, data_frames: list[dict], end_frame: dict, kwd) -> None:
|
||||
Args:
|
||||
- start_frame (dict): The first message received, typically containing metadata.
|
||||
- data_frames (list[dict]): A list of all data frames received during the acquisition.
|
||||
- end_frame (dict): The last message received, typically containing the EndFrame type.
|
||||
- any additional keyword arguments provided when registering the callback.
|
||||
|
||||
Args:
|
||||
callback (callable): The callback function to be called. The callback signature should be:
|
||||
kwd (dict | None): Additional keyword arguments to pass to the callback, they will be unpacked
|
||||
when calling the callback. If None, an empty dictionary will be used.
|
||||
|
||||
Returns:
|
||||
str: A unique identifier for the callback.
|
||||
"""
|
||||
if kwd is None:
|
||||
kwd = {}
|
||||
cb_id = uuid.uuid4()
|
||||
self.callbacks[cb_id] = (callback, kwd)
|
||||
logger.info(f"Callback {callback.__name__} added with UUID {cb_id}.")
|
||||
return str(cb_id)
|
||||
|
||||
def remove_callback(self, cb_id: str):
|
||||
"""
|
||||
Remove a callback by its unique identifier.
|
||||
|
||||
Args:
|
||||
cb_id (str): The unique identifier of the callback to remove.
|
||||
"""
|
||||
if cb_id in self.callbacks:
|
||||
self.callbacks.pop(cb_id)
|
||||
logger.info(f"Callback with UUID {cb_id} removed.")
|
||||
else:
|
||||
logger.warning(f"Callback with UUID {cb_id} not found.")
|
||||
|
||||
def start_data_server(self) -> StatusBase:
|
||||
"""
|
||||
Start the data server to receive data from the Timepix Fly backend over a socket connection.
|
||||
It will try to decypher the hostname through socket.getaddrinfo, and if multiple addresses
|
||||
are found, it will use the first one. Please note that depending on the network configuration,
|
||||
the hostname might not have the correct domain name attached, so it is recommended to specify
|
||||
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch'.
|
||||
|
||||
The method creates a socket server that listens for incoming connections on the specified
|
||||
hostname and port. It starts a thread that continuously receives data from the socket,
|
||||
decodes the received JSON data, and processes it. The data is expected to be in JSON format,
|
||||
with each message ending with a trailing byte "}\n".
|
||||
|
||||
Returns:
|
||||
StatusBase: A status object that indicates if the data server thread is ready to accept
|
||||
connections. High level implementation should ensure that the data server is
|
||||
started (status.wait(timeout=4)) before any data is sent from the backend.
|
||||
"""
|
||||
info = socket.getaddrinfo(
|
||||
self.hostname, port=self.socket_port, family=socket.AF_INET, type=socket.SOCK_STREAM
|
||||
)
|
||||
if len(info) == 0:
|
||||
raise RuntimeError(f"Could not resolve hostname {self.hostname} for socket server.")
|
||||
if len(info) > 1:
|
||||
logger.info(
|
||||
f"Multiple addresses found for {self.hostname}. Using the first one: {info[0]}"
|
||||
)
|
||||
family, socktype, proto, _, sockaddr = info[0]
|
||||
|
||||
self._socket_server = socket.create_server(sockaddr, family=family, backlog=1)
|
||||
self._socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
# Set the hostname and socket_port to the ones that was picked by the socket.getaddrinfo
|
||||
self.hostname, self.socket_port = self._socket_server.getsockname()
|
||||
logger.info(
|
||||
f"Socket server started on {self.hostname}:{self.socket_port}. Waiting for connections."
|
||||
)
|
||||
|
||||
# Create status object to return for the high level implementations
|
||||
status = StatusBase()
|
||||
|
||||
if self._data_thread is None or not self._data_thread.is_alive():
|
||||
self._data_thread_shutdown_event.clear()
|
||||
self._data_thread = threading.Thread(
|
||||
target=self._receive_data_on_socket, kwargs={"status": status}
|
||||
)
|
||||
self._data_thread.start()
|
||||
else:
|
||||
raise TimepixFlyBackendException(
|
||||
"Data server thread is already running on timepix_fly_backend."
|
||||
)
|
||||
|
||||
return status
|
||||
|
||||
def _receive_data_on_socket(self, status: StatusBase):
|
||||
"""
|
||||
Background loop running in a thread, that receives data from the
|
||||
timepix fly backend over socket_server. The backend reconnects for every acquisition (trigger),
|
||||
to this socket. Therefore, it is important to handle all connections and disconnections properly.
|
||||
|
||||
The buffer variable stores a string stream of received data. Whenever a trailing byte "}\n" is found,
|
||||
in the buffer, the buffer is split into chunks of received data and each chunk is decoded
|
||||
as a JSON object. The decoded objects are then processed, and if an EndFrame message is received,
|
||||
the registered callbacks are executed with the StartFrame, all DataFrames, and the EndFrame message.
|
||||
"""
|
||||
buffer = ""
|
||||
self._socket_server.settimeout(
|
||||
0.1
|
||||
) # Set short socket timeout to avoid blocking the thread loop
|
||||
status.set_finished() # Indicate that the socket server is ready to accept connections
|
||||
while not self._data_thread_shutdown_event.is_set(): # Shutdown event
|
||||
try:
|
||||
# blocks until connected or timeout reached
|
||||
conn, addr = self._socket_server.accept()
|
||||
except socket.timeout:
|
||||
continue # Timeout is okay, continue
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# Log error, check if shutdown event is set.
|
||||
# Shutdown event should be set before socket_server.close() is called.
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error accepting connection: {content}")
|
||||
continue
|
||||
logger.debug(f"Connection accepted from {addr} for timepix_fly backend.")
|
||||
# Clear the message buffer before entering the loop.
|
||||
if self.__msg_buffer:
|
||||
logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}")
|
||||
self.__msg_buffer.clear()
|
||||
conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv
|
||||
with conn:
|
||||
while not self._data_thread_shutdown_event.is_set():
|
||||
try:
|
||||
# What if we split the chunk
|
||||
chunk = conn.recv(4096) # Adjust buffer size as needed
|
||||
except socket.timeout:
|
||||
# Timeout is okay, continue in loop
|
||||
continue
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
logger.error(f"Connection error: {e}. Closing connection.")
|
||||
# conn = None #TODO should we reset conn?
|
||||
break
|
||||
if not chunk:
|
||||
# Receiving an empty chunk means the connection was closed
|
||||
# conn = None #TODO should we reset conn?
|
||||
break
|
||||
buffer += chunk.decode("utf-8")
|
||||
|
||||
# Check if trailing byte "}\n" present in buffer
|
||||
buffer_chunks = buffer.split("}\n")
|
||||
for entry in buffer_chunks[:-1]:
|
||||
# Process all complete JSON objects in the buffer
|
||||
self._decode_received_data(entry + "}")
|
||||
# Keep the last incomplete chunk.
|
||||
# If the buffer ended with "}\n", this will be an empty string.
|
||||
buffer = buffer_chunks[-1]
|
||||
|
||||
def _decode_received_data(self, buffer: str) -> None:
|
||||
"""
|
||||
Decode the received data from the socket.
|
||||
|
||||
Args:
|
||||
buffer (str): The JSON string received from the socket.
|
||||
"""
|
||||
try:
|
||||
obj, _ = self._decoder.raw_decode(buffer)
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}")
|
||||
return # TODO should this raise, or only log error as of now?
|
||||
|
||||
self.__msg_buffer.append(obj)
|
||||
if obj.get("type", "") == "EndFrame":
|
||||
try:
|
||||
# If the EndFrame message is received, run the callbacks
|
||||
logger.debug(f"Running callbacks")
|
||||
self.run_msg_callbacks()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error in msg callbacks with error msg: {content}")
|
||||
msgs_in_buffer = "".join(
|
||||
[f"{msg['type']} with keys {msg.keys()} \n" for msg in self.__msg_buffer]
|
||||
)
|
||||
logger.debug(f"TimePixFlyBackend: Messages in buffer: {msgs_in_buffer}")
|
||||
finally: # Make sure to always reset the message buffer after processing
|
||||
logger.debug(
|
||||
"TimePixFlyBackend: Resetting message buffer after processing EndFrame message."
|
||||
)
|
||||
logger.debug(f"Messages in buffer: {len(self.__msg_buffer)}")
|
||||
self.__msg_buffer.clear()
|
||||
|
||||
def run_msg_callbacks(self):
|
||||
"""Run callbacks if EndFrame message is received."""
|
||||
start_frame = self.__msg_buffer[0]
|
||||
end_frame = self.__msg_buffer[-1]
|
||||
data_frames = self.__msg_buffer[1:-1]
|
||||
for cb, kwd in self.callbacks.values():
|
||||
try:
|
||||
cb(start_frame, data_frames, end_frame, **kwd)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error in callback with error msg: {content}")
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import time
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.test_utils.timepix_fly_mock_server import (
|
||||
TimePixFlyMockServer,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
TimepixEndFrame,
|
||||
TimepixStartFrame,
|
||||
TimepixXESFrame,
|
||||
)
|
||||
|
||||
mock_server = TimePixFlyMockServer()
|
||||
timepix = TimepixFlyBackend(
|
||||
backend_rest_url="localhost:8452", hostname="localhost", socket_port=3031
|
||||
)
|
||||
|
||||
start_frames = {}
|
||||
xes_frames = {}
|
||||
end_frames = {}
|
||||
|
||||
def add_msg_callback(start_frame, data_frames, end_frame, **kwargs):
|
||||
"""Callback to print received messages."""
|
||||
counter = len(start_frames)
|
||||
start_frames[counter] = TimepixStartFrame(**start_frame)
|
||||
xes_frames[counter] = [TimepixXESFrame(**data_frame) for data_frame in data_frames]
|
||||
end_frames[counter] = TimepixEndFrame(**end_frame)
|
||||
|
||||
try:
|
||||
|
||||
print("TimepixFlyBackend initialized.")
|
||||
timepix.on_connected()
|
||||
print("TimepixFlyBackend connected.")
|
||||
# Parse scan info for OtherConfig
|
||||
config = OtherConfigModel(
|
||||
output_uri=f"tcp:{timepix.hostname}:{timepix.socket_port}", TRoiStep=1, TRoiN=5000
|
||||
)
|
||||
# Parse pixel map from scan info if needed, otherwise use some default pixel map.
|
||||
pixel_map = PixelMap(
|
||||
chips=[
|
||||
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
|
||||
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
|
||||
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
|
||||
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
|
||||
]
|
||||
)
|
||||
timepix.add_callback(add_msg_callback)
|
||||
timepix.on_stage(other_config=config, pixel_map=pixel_map)
|
||||
print("TimepixFlyBackend staged with configuration and pixel map.")
|
||||
for ii in range(5):
|
||||
print(f"Starting scan {ii + 1}...;")
|
||||
time.sleep(1)
|
||||
status_1 = timepix.on_trigger()
|
||||
# print("TimepixFlyBackend pre-scan started.")
|
||||
status_1.wait(timeout=10)
|
||||
mock_server.start_acquisition()
|
||||
status_2 = timepix.on_trigger_finished()
|
||||
status_2.wait(timeout=10)
|
||||
# print("Acquisition started on mock server.")
|
||||
|
||||
print("TimepixFlyBackend scan completed.")
|
||||
status = timepix.on_complete()
|
||||
status.wait(timeout=10)
|
||||
print(
|
||||
f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames."
|
||||
)
|
||||
# pylint: disable=broad-except
|
||||
except Exception as e:
|
||||
logger.error(f"Error during TimepixFlyBackend operation: {e}")
|
||||
finally:
|
||||
timepix.on_destroy()
|
||||
print("TimepixFlyBackend destroyed.")
|
||||
@@ -0,0 +1,497 @@
|
||||
"""
|
||||
Module that implements a python client interface to the TimePix Fly tpx3app REST API,
|
||||
and connects to the TimePix Fly WebSocket server to receive status updates.
|
||||
It provides methods to start, stop, and configure the TimePix detector,
|
||||
as well as to retrieve pixel maps, other configuration parameters, and
|
||||
the current state of the detector.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import Any, Type
|
||||
|
||||
import requests
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import StatusBase
|
||||
from websockets import State
|
||||
from websockets.exceptions import WebSocketException
|
||||
from websockets.sync.client import ClientConnection, connect
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
LastError,
|
||||
NetAddresses,
|
||||
OtherConfigModel,
|
||||
PixelMap,
|
||||
PixelMapFromFile,
|
||||
ProgramState,
|
||||
TimePixResponse,
|
||||
Version,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
class TimePixStatusError(Exception):
|
||||
"""Exception raised when the TimePix detector status was in an unexpected state."""
|
||||
|
||||
|
||||
class TimePixFlyStatus(str, enum.Enum):
|
||||
"""
|
||||
Enum representing the status of the TimePix detector.
|
||||
"""
|
||||
|
||||
INIT = "init"
|
||||
CONFIG = "config"
|
||||
SETUP = "setup"
|
||||
COLLECT = "collect"
|
||||
SHUTDOWN = "shutdown"
|
||||
UNDEFINED = "undefined"
|
||||
AWAIT_CONNECTION = "await_connection"
|
||||
EXCEPT = "except"
|
||||
|
||||
|
||||
class TimepixFlyClient:
|
||||
"""
|
||||
A client for the TimePix fly backend (tpx3app).
|
||||
It exposes methods to interact with REST endpoints
|
||||
and allows to connect callbacks to status objects from ophyd
|
||||
that allow to dynamically update based on the state of the backend.
|
||||
"""
|
||||
|
||||
def __init__(self, rest_url: str, ws_url: str):
|
||||
"""
|
||||
Initialize the TimePixFlyClient with a server address.
|
||||
|
||||
Args:
|
||||
rest_url (str): The REST API URL for the TimePix Fly backend, e.g., "localhost:8452".
|
||||
ws_url (str): The WebSocket URL for the TimePix Fly backend, e.g., "localhost:8452/ws".
|
||||
"""
|
||||
self.rest_url = rest_url
|
||||
self.ws_url = ws_url
|
||||
self.ws_client: ClientConnection | None = None
|
||||
self._rlock = threading.RLock()
|
||||
self._timeout = 5 # Default timeout for requests
|
||||
self._status: TimePixFlyStatus = TimePixFlyStatus.UNDEFINED
|
||||
self._ws_update_thread: threading.Thread | None = None
|
||||
self._shutdown_event = threading.Event()
|
||||
self._status_callbacks: dict[
|
||||
str, tuple[StatusBase, list[TimePixFlyStatus], list[TimePixFlyStatus]]
|
||||
] = {}
|
||||
self._started: bool = False # Flag to indicate if the client has started sending data
|
||||
|
||||
#############################
|
||||
### Utility Methods ###
|
||||
#############################
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called when the client is connected to the TimePix server.
|
||||
This method can be overridden to perform actions when the client connects.
|
||||
"""
|
||||
try:
|
||||
self.stop_running_collection()
|
||||
self.connect()
|
||||
self.wait_for_connection(timeout=5)
|
||||
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Error while checking the state of the TimePix server: {content}. "
|
||||
f"Please check the server address and ensure the server is running."
|
||||
)
|
||||
# pylint: disable=raise-missing-from
|
||||
raise ConnectionError(
|
||||
f"TimePix Fly client failed to connect to {self.rest_url}. Please check logs for detailed error."
|
||||
)
|
||||
|
||||
def stop_running_collection(self):
|
||||
"""
|
||||
Resets the TimePix backend to the configuration state. We check if the backend
|
||||
is in the CONFIG state, and if it is not, we stop the current collection.
|
||||
We check in addition that the client has not been started before via start()
|
||||
REST API call. stop_collect() will reset the flag _started to False.
|
||||
"""
|
||||
state = self.state()
|
||||
if state.state != TimePixFlyStatus.CONFIG or self._started is True:
|
||||
logger.info(
|
||||
f"Stopping running collection on TimePix backend, current state: {state.state}, was started: {self._started}"
|
||||
)
|
||||
self.stop_collect()
|
||||
|
||||
##############################
|
||||
### WebSocket Methods ###
|
||||
### Status Update Handling ###
|
||||
##############################
|
||||
|
||||
@property
|
||||
def status(self) -> TimePixFlyStatus:
|
||||
"""
|
||||
Get the current status of the TimePix detector.
|
||||
|
||||
Returns:
|
||||
TimePixFlyStatus: The current status of the TimePix detector.
|
||||
"""
|
||||
return self._status
|
||||
|
||||
def add_status_callback(
|
||||
self,
|
||||
status: StatusBase,
|
||||
success: list[TimePixFlyStatus],
|
||||
error: list[TimePixFlyStatus],
|
||||
run: bool = True,
|
||||
):
|
||||
"""
|
||||
Add a StatusBase callback for the TimePix detector. The status will be updated when the detector status
|
||||
changes and set to finished when the status matches one of the specified success statuses and to exception
|
||||
when the status matches one of the specified error statuses.
|
||||
|
||||
Per default, the callback will immediately check and run if the status is already in success.
|
||||
|
||||
Args:
|
||||
status (StatusBase): StatusBase object
|
||||
success (list[StdDaqStatus]): list of statuses that indicate success
|
||||
error (list[StdDaqStatus]): list of statuses that indicate error
|
||||
run (bool): If True, the callback will be run immediately if the status is already in success.
|
||||
If False, the callback will not be run immediately.
|
||||
"""
|
||||
if run is True:
|
||||
try:
|
||||
if self.status in success:
|
||||
status.set_finished()
|
||||
return
|
||||
if self.status in error:
|
||||
last_error = self.last_error()
|
||||
raise TimePixStatusError(
|
||||
f"Current state {self.status} of TimePixFly Backend is in list of error states: {error}. Last error: {last_error.message}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error while adding status callback: {e}")
|
||||
if status.done is False:
|
||||
status.set_exception(e)
|
||||
self._status_callbacks[id(status)] = (status, success, error)
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the TimePix WebSocket server."""
|
||||
if self._ws_update_thread is not None and self._ws_update_thread.is_alive():
|
||||
return
|
||||
self._ws_update_thread = threading.Thread(target=self._ws_update_loop, daemon=True)
|
||||
self._ws_update_thread.start()
|
||||
|
||||
# pylint: disable=raise-missing-from
|
||||
def wait_for_connection(self, timeout: float = 6) -> None:
|
||||
"""
|
||||
Wait for the connection to the TimepixFly client connection to be established.
|
||||
|
||||
Args:
|
||||
timeout (float): timeout for the request
|
||||
"""
|
||||
logger.info(
|
||||
f"Attempting to connect to TimePixFly WebSocket at {self.ws_url}, with timeout {timeout} seconds."
|
||||
)
|
||||
with self._rlock:
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if self.ws_client is not None and self.ws_client.state == State.OPEN:
|
||||
return
|
||||
try:
|
||||
self.ws_client = connect(f"ws://{self.ws_url}")
|
||||
break
|
||||
except ConnectionRefusedError:
|
||||
if time.time() - start_time > timeout:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Connection timed out: {content}")
|
||||
raise TimeoutError(
|
||||
f"Timeout while waiting for connection to TimePixFly WebSocket server on {self.ws_url}"
|
||||
)
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Failed to connect to TimePixFly WebSocket server on {self.ws_url}: {content}"
|
||||
)
|
||||
raise ConnectionError(
|
||||
f"Failed to connect to TimePixFly WebSocket server on {self.ws_url} with error: {content}"
|
||||
)
|
||||
time.sleep(0.5) # Try to reconnect every 0.5 seconds
|
||||
|
||||
def _ws_update_loop(self):
|
||||
"""Websocket update loop, runs in background thread."""
|
||||
while not self._shutdown_event.is_set():
|
||||
self._ws_send_and_receive()
|
||||
|
||||
def _ws_send_and_receive(self):
|
||||
"""Receive messages from the TimePixFly WebSocket server."""
|
||||
if not self.ws_client:
|
||||
self.wait_for_connection()
|
||||
try:
|
||||
try:
|
||||
recv_msgs = self.ws_client.recv(timeout=0.1)
|
||||
except TimeoutError:
|
||||
return
|
||||
logger.trace(f"Received from timepixfly ws: {recv_msgs}")
|
||||
if recv_msgs is not None:
|
||||
self._on_received_ws_message(recv_msgs)
|
||||
except WebSocketException:
|
||||
content = traceback.format_exc()
|
||||
logger.warning(f"Websocket connection closed unexpectedly: {content}")
|
||||
self.wait_for_connection()
|
||||
|
||||
def _on_received_ws_message(self, msg: str):
|
||||
"""
|
||||
Handle a message received from the StdDAQ.
|
||||
"""
|
||||
try:
|
||||
self._status = TimePixFlyStatus(msg)
|
||||
logger.info(f"Received TimepixFly status: {self._status.value}")
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Failed to decode websocket message: {content}")
|
||||
return
|
||||
self._run_status_callbacks()
|
||||
|
||||
def _run_status_callbacks(self):
|
||||
"""
|
||||
Update the StatusBase objects based on the current status of the StdDAQ.
|
||||
If the status matches one of the success or error statuses, the StatusBase object will be set to finished
|
||||
or exception, respectively and removed from the list of callbacks.
|
||||
"""
|
||||
status = self._status
|
||||
logger.warning(f"Running status callbacks for status: {status.value}")
|
||||
callback_ids = list(self._status_callbacks.keys())
|
||||
for cb_id in callback_ids:
|
||||
dev_status, success, error = self._status_callbacks[cb_id]
|
||||
with dev_status._lock:
|
||||
if dev_status.done:
|
||||
self._status_callbacks.pop(cb_id)
|
||||
continue
|
||||
if status in success:
|
||||
dev_status.set_finished()
|
||||
logger.debug(f"Status callback finished in succes: {status.value}")
|
||||
self._status_callbacks.pop(cb_id)
|
||||
elif status in error:
|
||||
try:
|
||||
last_error = self.last_error()
|
||||
raise TimePixStatusError(
|
||||
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
|
||||
dev_status.set_exception(e)
|
||||
self._status_callbacks.pop(cb_id)
|
||||
# Reset the _started flag if the status is in CONFIG.
|
||||
if status == TimePixFlyStatus.CONFIG:
|
||||
self._started = False # Should this be made thread-safe?
|
||||
|
||||
def shutdown(self):
|
||||
"""Shutdown the TimepixFlyClient client."""
|
||||
self._shutdown_event.set()
|
||||
if self.ws_client is not None:
|
||||
self.ws_client.close()
|
||||
self.ws_client = None
|
||||
|
||||
############################
|
||||
##### REST API Methods #####
|
||||
############################
|
||||
|
||||
def _get(
|
||||
self, get_cmd: str, get_response_model: Type[TimePixResponse] | None = None
|
||||
) -> Type[TimePixResponse] | None:
|
||||
"""
|
||||
Send a GET request to the TimePix server.
|
||||
|
||||
Args:
|
||||
get_cmd (str): The command to send in the GET request.
|
||||
get_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
|
||||
|
||||
Returns:
|
||||
Any: The parsed response if a model is provided, else the raw response.
|
||||
"""
|
||||
logger.debug(f"Sending GET request to TimePix server: {get_cmd}")
|
||||
response = requests.get(f"http://{self.rest_url}/{get_cmd}", timeout=self._timeout)
|
||||
response.raise_for_status() # Raise an error for bad responses
|
||||
if get_response_model is not None:
|
||||
try:
|
||||
return get_response_model(**response.json())
|
||||
except Exception as e:
|
||||
logger.info(f"Error parsing response for {get_cmd}: Response: {response.text}")
|
||||
raise e
|
||||
else:
|
||||
return response.text
|
||||
|
||||
def _put(
|
||||
self, put_cmd: str, value: dict[str, Any], put_response_model: Type[TimePixResponse]
|
||||
) -> Type[TimePixResponse] | None:
|
||||
"""
|
||||
Send a PUT request to the TimePix server.
|
||||
|
||||
Args:
|
||||
put_cmd (str): The command to send in the PUT request.
|
||||
value (dict[str, Any]): The value to send in the PUT request.
|
||||
put_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
|
||||
|
||||
Returns:
|
||||
Any: The parsed response if a model is provided, else None.
|
||||
"""
|
||||
logger.debug(f"Sending PUT request to TimePix server: {put_cmd} with value: {value}")
|
||||
response = requests.put(
|
||||
f"http://{self.rest_url}/{put_cmd}", json=value, timeout=self._timeout
|
||||
)
|
||||
response.raise_for_status()
|
||||
if put_response_model is not None:
|
||||
return put_response_model(**response.json())
|
||||
|
||||
def start(self) -> None:
|
||||
"""
|
||||
Start the TimePix detector by sending a GET request to the start endpoint.
|
||||
This method is a wrapper around the REST API call to start the detector.
|
||||
"""
|
||||
logger.debug(f"Start called from client")
|
||||
self._get(get_cmd="?start=true")
|
||||
self._started = True
|
||||
|
||||
def stop(self) -> None:
|
||||
"""
|
||||
Stop the TimePix detector by sending a GET request to the stop endpoint.
|
||||
This method is a wrapper around the REST API call to stop the detector.
|
||||
"""
|
||||
self._get(get_cmd="?stop=true")
|
||||
self._started = False
|
||||
|
||||
def stop_collect(self) -> None:
|
||||
"""
|
||||
Stop the data collection of the TimePix detector by sending a GET request to the stop-collect endpoint.
|
||||
This method is a wrapper around the REST API call to stop data collection.
|
||||
"""
|
||||
self._get(get_cmd="?stop_collect=true")
|
||||
self._started = False
|
||||
|
||||
def kill(self) -> None:
|
||||
"""
|
||||
Kill the TimePix detector by sending a GET request to the kill endpoint.
|
||||
This method is a wrapper around the REST API call to kill the detector.
|
||||
"""
|
||||
self._get(get_cmd="?kill=true")
|
||||
self._started = False
|
||||
|
||||
def last_error(self) -> LastError:
|
||||
"""
|
||||
Get the last error message from the TimePix detector by sending a GET request
|
||||
to the last-error endpoint.
|
||||
|
||||
Returns:
|
||||
LastError: The last error message from the detector.
|
||||
"""
|
||||
return self._get(get_cmd="last-error", get_response_model=LastError)
|
||||
|
||||
def state(self) -> ProgramState:
|
||||
"""
|
||||
Get the program state of the TimePix detector by sending a GET request
|
||||
to the state endpoint.
|
||||
|
||||
Returns:
|
||||
ProgramState: The current state of the TimePix detector.
|
||||
"""
|
||||
return self._get(get_cmd="state", get_response_model=ProgramState)
|
||||
|
||||
def version(self) -> Version:
|
||||
"""
|
||||
Get the version of the TimePix detector by sending a GET request
|
||||
to the version endpoint.
|
||||
|
||||
Returns:
|
||||
Version: The version information of the TimePix detector.
|
||||
"""
|
||||
return self._get(get_cmd="version", get_response_model=Version)
|
||||
|
||||
def get_pixel_map(self) -> PixelMap:
|
||||
"""
|
||||
Get the pixel map of the TimePix detector by sending a GET request
|
||||
to the pixel-map endpoint.
|
||||
|
||||
Returns:
|
||||
PixelMap: The pixel map of the TimePix detector.
|
||||
"""
|
||||
return self._get(get_cmd="pixel-map", get_response_model=PixelMap)
|
||||
|
||||
def set_pixel_map(self, pixel_map: PixelMap | dict) -> None:
|
||||
"""
|
||||
Set the pixel map of the TimePix detector by sending a PUT request
|
||||
to the pixel-map endpoint.
|
||||
|
||||
Args:
|
||||
pixel_map (PixelMap | dict): The pixel map to set. Can be a PixelMap instance or a dictionary.
|
||||
"""
|
||||
if not isinstance(pixel_map, PixelMap):
|
||||
if isinstance(pixel_map, dict):
|
||||
pixel_map = PixelMap(**pixel_map)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Value must be an instance of PixelMap. Received {type(pixel_map)}, {pixel_map}."
|
||||
)
|
||||
self._put(put_cmd="pixel-map", value=pixel_map.model_dump(), put_response_model=None)
|
||||
|
||||
def set_pixel_map_from_file(self, pixel_map_file: PixelMapFromFile | dict | str) -> None:
|
||||
"""
|
||||
Set the pixel map of the TimePix detector from a file by sending a PUT request
|
||||
to the pixel-map-from-file endpoint.
|
||||
|
||||
Args:
|
||||
pixel_map_file (PixelMapFromFile | dict): The pixel map from a file to set.
|
||||
Can be a PixelMapFromFile instance or a dictionary.
|
||||
"""
|
||||
if not isinstance(pixel_map_file, PixelMapFromFile):
|
||||
if isinstance(pixel_map_file, dict):
|
||||
pixel_map_file = PixelMapFromFile(**pixel_map_file)
|
||||
elif isinstance(pixel_map_file, str):
|
||||
pixel_map_file = PixelMapFromFile(filename=pixel_map_file)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Value must be an instance of PixelMapFromFile. Received {type(pixel_map_file)}, {pixel_map_file}."
|
||||
)
|
||||
self._put(
|
||||
put_cmd="pixel-map-from-file",
|
||||
value=pixel_map_file.model_dump(),
|
||||
put_response_model=PixelMapFromFile,
|
||||
)
|
||||
|
||||
def get_other_config(self) -> OtherConfigModel:
|
||||
"""
|
||||
Get the other configuration parameters of the TimePix detector by sending a GET request
|
||||
to the other-config endpoint.
|
||||
|
||||
Returns:
|
||||
OtherConfigModel: The other configuration parameters of the TimePix detector.
|
||||
"""
|
||||
return self._get(get_cmd="other-config", get_response_model=OtherConfigModel)
|
||||
|
||||
def set_other_config(self, other_config: OtherConfigModel | dict) -> None:
|
||||
"""
|
||||
Set the other configuration parameters of the TimePix detector by sending a PUT request
|
||||
to the other-config endpoint.
|
||||
|
||||
Args:
|
||||
other_config (OtherConfigModel | dict): The other configuration parameters to set.
|
||||
Can be an OtherConfigModel instance or a dictionary.
|
||||
"""
|
||||
if not isinstance(other_config, OtherConfigModel):
|
||||
if isinstance(other_config, dict):
|
||||
other_config = OtherConfigModel(**other_config)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Value must be an instance of OtherConfigModel. Received {type(other_config)}, {other_config}."
|
||||
)
|
||||
self._put(put_cmd="other-config", value=other_config.model_dump(), put_response_model=None)
|
||||
|
||||
def get_net_addresses(self) -> NetAddresses:
|
||||
"""
|
||||
Get the network addresses of the TimePix detector by sending a GET request
|
||||
to the net-addresses endpoint.
|
||||
|
||||
Returns:
|
||||
dict[str, str]: A dictionary containing the network addresses of the TimePix detector.
|
||||
"""
|
||||
return self._get(get_cmd="net-addresses", get_response_model=NetAddresses)
|
||||
@@ -0,0 +1,182 @@
|
||||
"""
|
||||
This module defines Pydantic models for the TimePix detector API responses. These
|
||||
models are used to validate and structure the data returned by the TimePix REST API.
|
||||
Any change will be reflected immediately, which will simplify debugging if the API changes.
|
||||
"""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
|
||||
|
||||
class TimePixResponse(BaseModel):
|
||||
"""Base model for TimePix responses."""
|
||||
|
||||
model_config = {"validate_assignment": True}
|
||||
|
||||
|
||||
class OtherConfigModel(TimePixResponse):
|
||||
"""
|
||||
OtherConfigModel is a Pydantic model that represents the configuration
|
||||
for the TimePix detector.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the configuration, default is "OtherConfig".
|
||||
- output_uri: str - The URI for the data stream. The backend will send data to this address.
|
||||
It is the responsibility of the device must start a TCP server and listen on this socket to
|
||||
receive the data.
|
||||
- save_interval: int - The interval at which histograms are written.
|
||||
- TRoiStart: int - The start time for the Time ROI (Region of Interest).
|
||||
- TRoiStep: int - The step size for the Time ROI.
|
||||
- TRoiN: int - The number of points in the Time ROI.
|
||||
"""
|
||||
|
||||
type: str = "OtherConfig"
|
||||
output_uri: str
|
||||
save_interval: int = Field(
|
||||
default=131000, description="Interval in seconds to write histograms"
|
||||
)
|
||||
TRoiStart: int = Field(
|
||||
default=0, description="Start time for the Time ROI (Region of Interest)"
|
||||
)
|
||||
TRoiStep: int = Field(default=1, description="Step size for the Time ROI")
|
||||
TRoiN: int = Field(default=5000, description="Number of points in the Time ROI")
|
||||
|
||||
|
||||
class LastError(TimePixResponse):
|
||||
"""
|
||||
TimePixLastError is a Pydantic model that represents the last error message
|
||||
from the TimePix detector api for the REST API call '/last-error'.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the response, default is "LastError".
|
||||
- message: str - The last error message from the detector.
|
||||
"""
|
||||
|
||||
type: str = "LastError"
|
||||
message: str
|
||||
|
||||
|
||||
class ProgramState(TimePixResponse):
|
||||
"""
|
||||
ProgramState is a Pydantic model that represents the state of the TimePix program.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the response, default is "ProgramState".
|
||||
- state: str - The current state of the progra, can be "init", "config", "setup", "collect", "shutdown"
|
||||
"""
|
||||
|
||||
type: str = "ProgramState"
|
||||
state: Literal["init", "config", "setup", "await_connection", "collect", "shutdown"]
|
||||
|
||||
|
||||
class Version(TimePixResponse):
|
||||
"""
|
||||
Version is a Pydantic model that represents the version information of the TimePix detector.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the response, default is "Version".
|
||||
- version: str - The version string of the TimePix detector.
|
||||
"""
|
||||
|
||||
type: str = "Version"
|
||||
version: str
|
||||
|
||||
|
||||
class PixelMapFromFile(TimePixResponse):
|
||||
"""
|
||||
PixelMapFromFile is a Pydantic model that represents a pixel map loaded from a file.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the response, default is "PixelMapFromFile".
|
||||
- file: str - The path to the file containing the pixel map.
|
||||
"""
|
||||
|
||||
type: str = "PixelMapFromFile"
|
||||
file: str
|
||||
|
||||
|
||||
class PixelMap(TimePixResponse):
|
||||
"""
|
||||
PixelMap is a Pydantic model that represents the pixel mapping for the TimePix detector.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the response, default is "PixelMap".
|
||||
- chips: list - A list of chips, each containing a list of pixel mappings.
|
||||
"""
|
||||
|
||||
type: str = "PixelMap"
|
||||
chips: list[list[dict[Literal["i", "p", "f"], int | float | list[int | float]]]]
|
||||
|
||||
|
||||
# For efficiency, we do not arse the responses into Pydantic models, but use the dict from
|
||||
# json directly. Nevertheless, we define the models here to have a common interface
|
||||
# and to be able to use them in the future if needed.
|
||||
class TimepixStartFrame(TimePixResponse):
|
||||
"""TimepixStartFrame is a Pydantic model that represents the start frame of a TimePix acquisition."""
|
||||
|
||||
type: str = "StartFrame"
|
||||
Mode: Literal["TOA"]
|
||||
TRoiStart: int
|
||||
TRoiStep: int
|
||||
TRoiN: int
|
||||
NumEnergyPoints: int
|
||||
save_interval: int
|
||||
|
||||
|
||||
class TimepixXESFrame(TimePixResponse):
|
||||
"""TimepixXESFrame is a Pydantic model that represents a data frame from the TimePix detector."""
|
||||
|
||||
type: str = "XesData"
|
||||
period: int
|
||||
TDSpectra: list[float]
|
||||
totalEvents: int
|
||||
beforeROI: int
|
||||
afterROI: int
|
||||
|
||||
|
||||
class TimepixEndFrame(TimePixResponse):
|
||||
"""
|
||||
TimepixEndFrame is a Pydantic model that represents the end frame of a TimePix acquisition.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the response, default is "EndFrame".
|
||||
- error: str - If an error occured during acuisition, this field contains the error message. Empty string otherwise.
|
||||
- periods: int - The last period count minus 3 (3 being the period predictor delay).
|
||||
"""
|
||||
|
||||
type: str = "EndFrame"
|
||||
error: str
|
||||
periods: int # Last period count minus 3 (3 being the period predictor delay)
|
||||
|
||||
|
||||
# Habe ein GET /net-addresses call implementiert
|
||||
# // /net-addresses GET applicable net addresses
|
||||
# // GET return:
|
||||
# // - status 200
|
||||
# // - data
|
||||
# // {
|
||||
# // "type":"NetAddresses",
|
||||
# // "control":"127.0.0.1:8452", // own rest interface
|
||||
# // "address":"127.0.0.1:8451", // own address, the destination of ASI server raw data
|
||||
# // "server":"127.0.0.1:8080" // ASI server rest interface address
|
||||
# // }
|
||||
|
||||
|
||||
class NetAddresses(TimePixResponse):
|
||||
"""
|
||||
NetAddresses is a Pydantic model that represents the network addresses used by the TimePix detector.
|
||||
|
||||
Attributes:
|
||||
- type: str - The type of the response, default is "NetAddresses".
|
||||
- control: str - The address of the REST interface for control commands.
|
||||
- address: str - The address where the ASI server sends raw data.
|
||||
- server: str - The address of the ASI server's REST interface.
|
||||
"""
|
||||
|
||||
type: str = "NetAddresses"
|
||||
control: str # timepix_rest_host
|
||||
address: str # data_socket_for_asi
|
||||
server: str # asi_rest_host
|
||||
382
superxas_bec/devices/timepix/utils.py
Normal file
382
superxas_bec/devices/timepix/utils.py
Normal file
@@ -0,0 +1,382 @@
|
||||
"""Temporary utility module for Status Object implementations."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from ophyd import Device, DeviceStatus, StatusBase
|
||||
|
||||
|
||||
class AndStatusWithList(DeviceStatus):
|
||||
"""
|
||||
Custom implementation of the AndStatus that combines the
|
||||
option to add multiple statuses as a list, and in addition
|
||||
allows for adding the Device as an object to access its
|
||||
methods.
|
||||
|
||||
Args"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device: Device,
|
||||
status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus],
|
||||
**kwargs,
|
||||
):
|
||||
self.all_statuses = status_list if isinstance(status_list, list) else [status_list]
|
||||
super().__init__(device=device, **kwargs)
|
||||
self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses]
|
||||
|
||||
def inner(status):
|
||||
with self._lock:
|
||||
if self._externally_initiated_completion:
|
||||
return
|
||||
if self.done: # Return if status is already done.. It must be resolved already
|
||||
return
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
if st.done and not st.success:
|
||||
self.set_exception(st.exception()) # st._exception
|
||||
return
|
||||
|
||||
if all(st.done for st in self.all_statuses) and all(
|
||||
st.success for st in self.all_statuses
|
||||
):
|
||||
self.set_finished()
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
st.add_callback(inner)
|
||||
|
||||
# TODO improve __repr__ and __str__
|
||||
def __repr__(self):
|
||||
return "<AndStatusWithList({self.all_statuses!r})>".format(self=self)
|
||||
|
||||
def __str__(self):
|
||||
return "<AndStatusWithList(done={self.done}, success={self.success})>".format(self=self)
|
||||
|
||||
def __contains__(self, status: StatusBase | DeviceStatus) -> bool:
|
||||
for child in self.all_statuses:
|
||||
if child == status:
|
||||
return True
|
||||
if isinstance(child, AndStatusWithList):
|
||||
if status in child:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
# # TODO Check if this actually works....
|
||||
# def set_exception(self, exc):
|
||||
# # Propagate the exception to all sub-statuses that are not done yet.
|
||||
#
|
||||
# with self._lock:
|
||||
# if self._externally_initiated_completion:
|
||||
# return
|
||||
# if self.done: # Return if status is already done.. It must be resolved already
|
||||
# return
|
||||
# super().set_exception(exc)
|
||||
# for st in self.all_statuses:
|
||||
# with st._lock:
|
||||
# if not st.done:
|
||||
# st.set_exception(exc)
|
||||
|
||||
def _run_callbacks(self):
|
||||
"""
|
||||
Set the Event and run the callbacks.
|
||||
"""
|
||||
if self.timeout is None:
|
||||
timeout = None
|
||||
else:
|
||||
timeout = self.timeout + self.settle_time
|
||||
if not self._settled_event.wait(timeout):
|
||||
self.log.warning("%r has timed out", self)
|
||||
with self._externally_initiated_completion_lock:
|
||||
if self._exception is None:
|
||||
exc = TimeoutError(
|
||||
f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
|
||||
)
|
||||
self._exception = exc
|
||||
# Mark this as "settled".
|
||||
try:
|
||||
self._settled()
|
||||
except Exception:
|
||||
self.log.exception("%r encountered error during _settled()", self)
|
||||
with self._lock:
|
||||
self._event.set()
|
||||
if self._exception is not None:
|
||||
try:
|
||||
self._handle_failure()
|
||||
except Exception:
|
||||
self.log.exception("%r encountered an error during _handle_failure()", self)
|
||||
for cb in self._callbacks:
|
||||
try:
|
||||
cb(self)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"An error was raised on a background thread while "
|
||||
"running the callback %r(%r).",
|
||||
cb,
|
||||
self,
|
||||
)
|
||||
self._callbacks.clear()
|
||||
|
||||
|
||||
class AndStatus(StatusBase):
|
||||
"""Custom AndStatus for TimePix detector."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
|
||||
name: str | Device | None = None,
|
||||
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
self.left = left if isinstance(left, list) else [left]
|
||||
if right is not None:
|
||||
self.right = right if isinstance(right, list) else [right]
|
||||
else:
|
||||
self.right = []
|
||||
self.all_statuses = self.left + self.right
|
||||
if name is None:
|
||||
name = "unname_status"
|
||||
elif isinstance(name, Device):
|
||||
name = name.name
|
||||
else:
|
||||
name = name
|
||||
self.name = name
|
||||
super().__init__(**kwargs)
|
||||
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
|
||||
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
|
||||
|
||||
def inner(status):
|
||||
with self._lock:
|
||||
if self._externally_initiated_completion:
|
||||
return
|
||||
if self.done: # Return if status is already done.. It must be resolved already
|
||||
return
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
if st.done and not st.success:
|
||||
self.set_exception(st.exception()) # st._exception
|
||||
return
|
||||
|
||||
if all(st.done for st in self.all_statuses) and all(
|
||||
st.success for st in self.all_statuses
|
||||
):
|
||||
self.set_finished()
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
st.add_callback(inner)
|
||||
|
||||
def __repr__(self):
|
||||
return "({self.left!r} & {self.right!r})".format(self=self)
|
||||
|
||||
def __str__(self):
|
||||
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
|
||||
|
||||
def __contains__(self, status: StatusBase) -> bool:
|
||||
for child in [self.left, self.right]:
|
||||
if child == status:
|
||||
return True
|
||||
if isinstance(child, AndStatus):
|
||||
if status in child:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _run_callbacks(self):
|
||||
"""
|
||||
Set the Event and run the callbacks.
|
||||
"""
|
||||
if self.timeout is None:
|
||||
timeout = None
|
||||
else:
|
||||
timeout = self.timeout + self.settle_time
|
||||
if not self._settled_event.wait(timeout):
|
||||
# We have timed out. It's possible that set_finished() has already
|
||||
# been called but we got here before the settle_time timer expired.
|
||||
# And it's possible that in this space be between the above
|
||||
# statement timing out grabbing the lock just below,
|
||||
# set_exception(exc) has been called. Both of these possibilties
|
||||
# are accounted for.
|
||||
self.log.warning("%r has timed out", self)
|
||||
with self._externally_initiated_completion_lock:
|
||||
# Set the exception and mark the Status as done, unless
|
||||
# set_exception(exc) was called externally before we grabbed
|
||||
# the lock.
|
||||
if self._exception is None:
|
||||
exc = TimeoutError(
|
||||
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
|
||||
)
|
||||
self._exception = exc
|
||||
# Mark this as "settled".
|
||||
try:
|
||||
self._settled()
|
||||
except Exception:
|
||||
# No alternative but to log this. We can't supersede set_exception,
|
||||
# and we have to continue and run the callbacks.
|
||||
self.log.exception("%r encountered error during _settled()", self)
|
||||
# Now we know whether or not we have succeed or failed, either by
|
||||
# timeout above or by set_exception(exc), so we can set the Event that
|
||||
# will mark this Status as done.
|
||||
with self._lock:
|
||||
self._event.set()
|
||||
if self._exception is not None:
|
||||
try:
|
||||
self._handle_failure()
|
||||
except Exception:
|
||||
self.log.exception("%r encountered an error during _handle_failure()", self)
|
||||
# The callbacks have access to self, from which they can distinguish
|
||||
# success or failure.
|
||||
for cb in self._callbacks:
|
||||
try:
|
||||
cb(self)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"An error was raised on a background thread while "
|
||||
"running the callback %r(%r).",
|
||||
cb,
|
||||
self,
|
||||
)
|
||||
self._callbacks.clear()
|
||||
|
||||
|
||||
# from __future__ import annotations
|
||||
|
||||
# from collections import defaultdict
|
||||
# from typing import Dict, List, Tuple
|
||||
|
||||
# import numpy as np
|
||||
|
||||
# ROI = List[Tuple[float, float]]
|
||||
|
||||
|
||||
# def order_roi_corners_simple(roi: ROI) -> np.ndarray:
|
||||
# """Order ROI corners as [top-left, top-right, bottom-right, bottom-left]."""
|
||||
# pts = np.array(roi, dtype=float)
|
||||
# cx, cy = pts.mean(axis=0)
|
||||
# angles = np.arctan2(pts[:, 1] - cy, pts[:, 0] - cx)
|
||||
# idx = np.argsort(angles)
|
||||
# ordered = pts[idx]
|
||||
# # Ensure clockwise order
|
||||
# if np.cross(ordered[1] - ordered[0], ordered[2] - ordered[0]) < 0:
|
||||
# ordered = ordered[::-1]
|
||||
# return ordered[:4]
|
||||
|
||||
|
||||
# def compute_affine_transform(
|
||||
# src: np.ndarray, dst: np.ndarray, preserve_scale: bool = True
|
||||
# ) -> np.ndarray:
|
||||
# """Compute affine transform mapping src -> dst. Optionally preserve pixel scale."""
|
||||
# if preserve_scale:
|
||||
# # Solve for rotation+translation only
|
||||
# A = np.array(
|
||||
# [
|
||||
# [src[0, 0], -src[0, 1], 1, 0],
|
||||
# [src[0, 1], src[0, 0], 0, 1],
|
||||
# [src[1, 0], -src[1, 1], 1, 0],
|
||||
# [src[1, 1], src[1, 0], 0, 1],
|
||||
# ]
|
||||
# )
|
||||
# b = dst[:2].ravel()
|
||||
# x, residuals, _, _ = np.linalg.lstsq(A, b, rcond=None)
|
||||
# a, b_, tx, ty = x
|
||||
# return np.array([[a, -b_, tx], [b_, a, ty]])
|
||||
# else:
|
||||
# # Full affine transform
|
||||
# src_h = np.hstack([src, np.ones((4, 1))])
|
||||
# dst_h = dst
|
||||
# M, _, _, _ = np.linalg.lstsq(src_h, dst_h, rcond=None)
|
||||
# return M.T
|
||||
|
||||
|
||||
# def apply_affine_transform(coords: np.ndarray, affine: np.ndarray) -> np.ndarray:
|
||||
# """Apply affine transform to coordinates."""
|
||||
# coords_h = np.hstack([coords, np.ones((coords.shape[0], 1))])
|
||||
# transformed = coords_h @ affine.T
|
||||
# return transformed[:, :2]
|
||||
|
||||
|
||||
# def roi_pixel_hits(
|
||||
# image_shape: Tuple[int, int], roi: ROI, start_idx: int = 0, min_fraction_diff: float = 0.0
|
||||
# ) -> List[Dict]:
|
||||
# """
|
||||
# For each ROI, return list of hits as dicts {'i': (x,y), 'p': row_idx, 'f': fraction}.
|
||||
# Supports rotated rectangles using bilinear fraction splitting.
|
||||
# """
|
||||
# hits_dict: Dict[Tuple[int, int], Dict[str, list]] = defaultdict(lambda: {"p": [], "f": []})
|
||||
|
||||
# corners = order_roi_corners_simple(roi)
|
||||
# height = int(np.linalg.norm(corners[0] - corners[3])) + 1
|
||||
# width = int(np.linalg.norm(corners[0] - corners[1])) + 1
|
||||
|
||||
# dst = np.array([[0, 0], [width - 1, 0], [width - 1, height - 1], [0, height - 1]], dtype=float)
|
||||
# affine_mat = compute_affine_transform(corners, dst) # 2x3
|
||||
|
||||
# # Bounding box in image
|
||||
# min_x = max(int(np.floor(corners[:, 0].min())), 0)
|
||||
# max_x = min(int(np.ceil(corners[:, 0].max())), image_shape[1] - 1)
|
||||
# min_y = max(int(np.floor(corners[:, 1].min())), 0)
|
||||
# max_y = min(int(np.ceil(corners[:, 1].max())), image_shape[0] - 1)
|
||||
|
||||
# yy, xx = np.meshgrid(np.arange(min_y, max_y + 1), np.arange(min_x, max_x + 1), indexing="ij")
|
||||
# coords = np.stack([xx.ravel(), yy.ravel()], axis=1) # N x 2
|
||||
|
||||
# local_coords = apply_affine_transform(coords, affine_mat)
|
||||
# x_local = local_coords[:, 0]
|
||||
# y_local = local_coords[:, 1]
|
||||
|
||||
# # Keep pixels inside ROI rectangle
|
||||
# mask = (x_local >= 0) & (x_local <= width - 1) & (y_local >= 0) & (y_local <= height - 1)
|
||||
# coords_in = coords[mask]
|
||||
# x_in = x_local[mask]
|
||||
# y_in = y_local[mask]
|
||||
|
||||
# # Bilinear fractions
|
||||
# x0 = np.floor(x_in).astype(int)
|
||||
# y0 = np.floor(y_in).astype(int)
|
||||
# dx = x_in - x0
|
||||
# dy = y_in - y0
|
||||
|
||||
# for coord, x0i, y0i, dxv, dyv in zip(coords_in, x0, y0, dx, dy):
|
||||
# row_base = start_idx
|
||||
|
||||
# # Contributions to 4 neighboring "rows"
|
||||
# contributions = [
|
||||
# (row_base + y0i, (1 - dxv) * (1 - dyv)),
|
||||
# (row_base + y0i, dxv * (1 - dyv)),
|
||||
# (row_base + y0i + 1, (1 - dxv) * dyv),
|
||||
# (row_base + y0i + 1, dxv * dyv),
|
||||
# ]
|
||||
|
||||
# # Filter negligible contributions
|
||||
# contributions = [(p, f) for p, f in contributions if f >= min_fraction_diff]
|
||||
|
||||
# # Normalize fractions to sum 1
|
||||
# if contributions:
|
||||
# total_f = sum(f for _, f in contributions)
|
||||
# contributions = [(p, f / total_f) for p, f in contributions]
|
||||
|
||||
# for p, f in contributions:
|
||||
# key = (int(coord[0]), int(coord[1]))
|
||||
# hits_dict[key]["p"].append(int(p))
|
||||
# hits_dict[key]["f"].append(float(f))
|
||||
|
||||
# hits_roi = [{"i": key, "p": value["p"], "f": value["f"]} for key, value in hits_dict.items()]
|
||||
# return hits_roi
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# image_shape = (512, 512)
|
||||
# rois = [
|
||||
# [(25, 25), (50, 50), (50, 25), (75, 50)], # rotated 45 degrees rectangle
|
||||
# [(300, 300), (400, 300), (400, 400), (300, 400)], # upright rectangle
|
||||
# ]
|
||||
|
||||
# hits_0 = roi_pixel_hits(image_shape, rois[0], min_fraction_diff=0.1)
|
||||
# hits_1 = roi_pixel_hits(image_shape, rois[1], start_idx=10, min_fraction_diff=0.2)
|
||||
|
||||
# print(hits_0[:5])
|
||||
# print(hits_1[:5])
|
||||
15
tests/tests_devices/test_timepix_fly_backend.py
Normal file
15
tests/tests_devices/test_timepix_fly_backend.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""This module tests the Timepix Fly backend functionality."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timepix_fly_backend():
|
||||
"""Fixture for creating a Timepix Fly backend instance."""
|
||||
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
|
||||
yield backend
|
||||
backend.on_destroy()
|
||||
276
tests/tests_devices/test_timepix_fly_client.py
Normal file
276
tests/tests_devices/test_timepix_fly_client.py
Normal file
@@ -0,0 +1,276 @@
|
||||
"""Module to test the Timepix Fly client functionality."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from ophyd import StatusBase
|
||||
from websockets import State
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
|
||||
TimepixFlyClient,
|
||||
TimePixFlyStatus,
|
||||
TimePixStatusError,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import ProgramState
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timepix_fly_client():
|
||||
"""Fixture for creating a Timepix Fly client instance."""
|
||||
client = TimepixFlyClient(rest_url="http://localhost:8000", ws_url="ws://localhost:8000/ws")
|
||||
try:
|
||||
yield client
|
||||
finally:
|
||||
client.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"return_state",
|
||||
[
|
||||
ProgramState(state=TimePixFlyStatus.CONFIG),
|
||||
ProgramState(state=TimePixFlyStatus.COLLECT),
|
||||
ProgramState(state=TimePixFlyStatus.SETUP),
|
||||
],
|
||||
)
|
||||
def test_timepix_fly_client_stop_running_collection(timepix_fly_client, return_state):
|
||||
"""Test the on_connected method of the Timepix Fly client."""
|
||||
with (
|
||||
mock.patch.object(timepix_fly_client, "stop_collect") as mock_stop_collect,
|
||||
mock.patch.object(timepix_fly_client, "state", return_value=return_state),
|
||||
):
|
||||
timepix_fly_client.stop_running_collection()
|
||||
|
||||
if return_state.state == TimePixFlyStatus.CONFIG:
|
||||
assert mock_stop_collect.call_count == 0, "Stop collect should be called once."
|
||||
timepix_fly_client._started = True
|
||||
timepix_fly_client.stop_running_collection()
|
||||
assert mock_stop_collect.call_count == 1, "Stop collect should not be called again."
|
||||
else:
|
||||
assert mock_stop_collect.call_count == 1, "Stop collect should be called once."
|
||||
timepix_fly_client._started = True
|
||||
timepix_fly_client.stop_running_collection()
|
||||
assert mock_stop_collect.call_count == 2, "Stop collect should be called in CONFIG."
|
||||
|
||||
|
||||
def test_timepix_fly_client_on_connected(timepix_fly_client):
|
||||
"""
|
||||
Test timepix fly client connect method.
|
||||
|
||||
This simply ensures that all methods are called. They are tested separately.
|
||||
"""
|
||||
with (
|
||||
mock.patch.object(timepix_fly_client, "stop_running_collection") as mock_stop_collection,
|
||||
mock.patch.object(timepix_fly_client, "connect") as mock_connect,
|
||||
mock.patch.object(timepix_fly_client, "wait_for_connection") as mock_wait_for_connection,
|
||||
):
|
||||
timepix_fly_client.on_connected()
|
||||
mock_stop_collection.assert_called_once()
|
||||
mock_connect.assert_called_once()
|
||||
mock_wait_for_connection.assert_called_once()
|
||||
|
||||
|
||||
def test_timepix_fly_client_connect(timepix_fly_client):
|
||||
"""This tests the connect method of timepix fly client."""
|
||||
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
|
||||
# Patch Thread so we don't actually start the background loop
|
||||
with mock.patch(f"{module_path}.threading.Thread") as mock_thread_cls:
|
||||
mock_thread = mock.Mock()
|
||||
mock_thread_cls.return_value = mock_thread
|
||||
|
||||
timepix_fly_client.connect()
|
||||
|
||||
# Thread should be created with the update loop as target and daemon True
|
||||
mock_thread_cls.assert_called_once()
|
||||
# start() must be called on the created thread
|
||||
mock_thread.start.assert_called_once()
|
||||
|
||||
|
||||
def test_timepix_fly_client_wait_for_connection(timepix_fly_client):
|
||||
"""This tests the wait_for_connection method of timepix fly client."""
|
||||
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
|
||||
# Case 1: ws_client already present and OPEN
|
||||
mock_client = mock.Mock()
|
||||
type(mock_client).state = mock.PropertyMock(return_value=State.OPEN)
|
||||
timepix_fly_client.ws_client = mock_client
|
||||
|
||||
# Should return immediately
|
||||
timepix_fly_client.wait_for_connection(timeout=0.1)
|
||||
|
||||
# Case 2: connect() establishes the connection
|
||||
timepix_fly_client.ws_client = None
|
||||
mock_client2 = mock.Mock()
|
||||
type(mock_client2).state = mock.PropertyMock(return_value=State.OPEN)
|
||||
with mock.patch(f"{module_path}.connect", return_value=mock_client2) as mock_connect:
|
||||
timepix_fly_client.wait_for_connection(timeout=0.1)
|
||||
mock_connect.assert_called_once()
|
||||
|
||||
|
||||
def test_timepix_fly_client_ws_send_and_received(timepix_fly_client):
|
||||
"""This tests the _ws_send_and_receive method of timepix fly client."""
|
||||
# Prepare a mock ws_client where recv returns a message
|
||||
mock_ws = mock.Mock()
|
||||
mock_ws.recv.return_value = "init"
|
||||
timepix_fly_client.ws_client = mock_ws
|
||||
|
||||
# Patch _on_received_ws_message to ensure it's invoked
|
||||
with mock.patch.object(timepix_fly_client, "_on_received_ws_message") as mock_on_msg:
|
||||
timepix_fly_client._ws_send_and_receive()
|
||||
mock_on_msg.assert_called_once_with("init")
|
||||
|
||||
# Now simulate a TimeoutError from recv: should simply return and not call _on_received_ws_message
|
||||
mock_ws2 = mock.Mock()
|
||||
mock_ws2.recv.side_effect = TimeoutError
|
||||
timepix_fly_client.ws_client = mock_ws2
|
||||
with mock.patch.object(timepix_fly_client, "_on_received_ws_message") as mock_on_msg2:
|
||||
timepix_fly_client._ws_send_and_receive()
|
||||
mock_on_msg2.assert_not_called()
|
||||
|
||||
|
||||
def test_timepix_fly_client_on_message_received(timepix_fly_client):
|
||||
"""This tests the _on_received_ws_message method of timepix fly client."""
|
||||
with mock.patch.object(timepix_fly_client, "_run_status_callbacks") as mock_run_callbacks:
|
||||
timepix_fly_client._on_received_ws_message("init")
|
||||
assert timepix_fly_client._status == TimePixFlyStatus.INIT
|
||||
mock_run_callbacks.assert_called_once()
|
||||
|
||||
# invalid message should not change status or call callbacks
|
||||
prev_status = timepix_fly_client._status
|
||||
with mock.patch.object(timepix_fly_client, "_run_status_callbacks") as mock_run_callbacks2:
|
||||
timepix_fly_client._on_received_ws_message("invalid_status_string")
|
||||
# status stays as previous value and callbacks not called
|
||||
assert timepix_fly_client._status == prev_status
|
||||
mock_run_callbacks2.assert_not_called()
|
||||
|
||||
|
||||
def test_timepix_fly_client_on_status_callbacks(timepix_fly_client):
|
||||
"""This tests the _run_status_callbacks method of timepix fly client."""
|
||||
# Immediate run when current status already in success
|
||||
timepix_fly_client._status = TimePixFlyStatus.INIT
|
||||
status = StatusBase()
|
||||
timepix_fly_client.add_status_callback(
|
||||
status=status, success=[TimePixFlyStatus.INIT], error=[TimePixFlyStatus.EXCEPT], run=True
|
||||
)
|
||||
assert status.done is True and status.success is True
|
||||
|
||||
# Add callback (do not run immediately) and then trigger via _run_status_callbacks
|
||||
status2 = StatusBase()
|
||||
timepix_fly_client.add_status_callback(
|
||||
status=status2,
|
||||
success=[TimePixFlyStatus.CONFIG],
|
||||
error=[TimePixFlyStatus.EXCEPT],
|
||||
run=False,
|
||||
)
|
||||
# Set status to CONFIG and mark started True to check reset
|
||||
timepix_fly_client._status = TimePixFlyStatus.CONFIG
|
||||
timepix_fly_client._started = True
|
||||
timepix_fly_client._run_status_callbacks()
|
||||
assert status2.done is True and status2.success is True
|
||||
assert timepix_fly_client._started is False
|
||||
|
||||
# Error path: add with run True when status is EXCEPT
|
||||
timepix_fly_client._status = TimePixFlyStatus.EXCEPT
|
||||
status3 = StatusBase()
|
||||
with mock.patch.object(timepix_fly_client, "last_error") as mock_last_error:
|
||||
mock_err = mock.Mock()
|
||||
mock_err.message = "boom"
|
||||
mock_last_error.return_value = mock_err
|
||||
|
||||
timepix_fly_client.add_status_callback(
|
||||
status=status3,
|
||||
success=[TimePixFlyStatus.INIT],
|
||||
error=[TimePixFlyStatus.EXCEPT],
|
||||
run=True,
|
||||
)
|
||||
assert status3.done is True and status3.success is False
|
||||
|
||||
|
||||
def test_timepix_fly_client_shutdown(timepix_fly_client):
|
||||
"""This tests the shutdown method of timepix fly client."""
|
||||
mock_ws = mock.Mock()
|
||||
timepix_fly_client.ws_client = mock_ws
|
||||
|
||||
timepix_fly_client.shutdown()
|
||||
|
||||
mock_ws.close.assert_called_once()
|
||||
assert timepix_fly_client.ws_client is None
|
||||
assert timepix_fly_client._shutdown_event.is_set()
|
||||
|
||||
|
||||
def test_timepix_fly_client_start(timepix_fly_client):
|
||||
"""This tests the start method of timepix fly client."""
|
||||
# The client._get uses requests.get with f"http://{self.rest_url}/{get_cmd}".
|
||||
# We mock requests.get and verify it is called with the expected URL and timeout,
|
||||
# and that the _started flag is set.
|
||||
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
|
||||
with mock.patch(f"{module_path}.requests.get") as mock_get:
|
||||
mock_resp = mock.Mock()
|
||||
mock_resp.raise_for_status = mock.Mock()
|
||||
mock_resp.text = ""
|
||||
mock_get.return_value = mock_resp
|
||||
|
||||
timepix_fly_client.start()
|
||||
|
||||
expected_url = f"http://{timepix_fly_client.rest_url}/?start=true"
|
||||
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
|
||||
assert timepix_fly_client._started is True
|
||||
|
||||
|
||||
def test_timepix_fly_client_stop_collect(timepix_fly_client):
|
||||
"""This tests the stop_collect method of timepix fly client."""
|
||||
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
|
||||
with mock.patch(f"{module_path}.requests.get") as mock_get:
|
||||
mock_resp = mock.Mock()
|
||||
mock_resp.raise_for_status = mock.Mock()
|
||||
mock_resp.text = ""
|
||||
mock_get.return_value = mock_resp
|
||||
|
||||
# ensure started is True and then call stop_collect
|
||||
timepix_fly_client._started = True
|
||||
timepix_fly_client.stop_collect()
|
||||
|
||||
expected_url = f"http://{timepix_fly_client.rest_url}/?stop_collect=true"
|
||||
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
|
||||
assert timepix_fly_client._started is False
|
||||
|
||||
|
||||
def test_timepix_fly_client_state(timepix_fly_client):
|
||||
"""This tests the state method of timepix fly client."""
|
||||
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
|
||||
with mock.patch(f"{module_path}.requests.get") as mock_get:
|
||||
mock_resp = mock.Mock()
|
||||
mock_resp.raise_for_status = mock.Mock()
|
||||
# Return a JSON payload compatible with ProgramState
|
||||
mock_resp.json.return_value = {"type": "ProgramState", "state": "init"}
|
||||
mock_get.return_value = mock_resp
|
||||
|
||||
program_state = timepix_fly_client.state()
|
||||
|
||||
expected_url = f"http://{timepix_fly_client.rest_url}/state"
|
||||
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
|
||||
# ProgramState.model defines 'state' as a string literal; ensure we parsed it
|
||||
assert hasattr(program_state, "state")
|
||||
assert program_state.state == "init"
|
||||
|
||||
|
||||
def test_timepix_fly_client_set_pixel_map(timepix_fly_client):
|
||||
"""This tests the set_pixel_map/_put path by mocking requests.put and checking payload."""
|
||||
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
|
||||
with mock.patch(f"{module_path}.requests.put") as mock_put:
|
||||
mock_resp = mock.Mock()
|
||||
mock_resp.raise_for_status = mock.Mock()
|
||||
mock_put.return_value = mock_resp
|
||||
|
||||
# Minimal valid PixelMap dict (type is optional; model supplies default)
|
||||
pixel_map = {"chips": [[{"i": 0, "p": 1, "f": 2}]]}
|
||||
|
||||
timepix_fly_client.set_pixel_map(pixel_map)
|
||||
|
||||
expected_url = f"http://{timepix_fly_client.rest_url}/pixel-map"
|
||||
# Verify requests.put called with expected url, json and timeout
|
||||
mock_put.assert_called_once()
|
||||
_, kwargs = mock_put.call_args
|
||||
assert kwargs.get("timeout") == timepix_fly_client._timeout
|
||||
assert kwargs.get("json") is not None
|
||||
assert kwargs.get("json").get("chips") == pixel_map["chips"]
|
||||
18
time_pix_fly/websocket-client.py
Normal file
18
time_pix_fly/websocket-client.py
Normal file
@@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
"""Client using the asyncio API."""
|
||||
|
||||
import asyncio
|
||||
from websockets.asyncio.client import connect
|
||||
|
||||
|
||||
async def hello():
|
||||
async with connect("ws://localhost:8452/ws") as websocket:
|
||||
await websocket.send("Hello world!")
|
||||
while True:
|
||||
message = await websocket.recv()
|
||||
print(message)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(hello())
|
||||
Reference in New Issue
Block a user