feat(timepix): Timepix integration with TimepixFly backend #10

Open
appel_c wants to merge 131 commits from feat/add_timepix_integration into main
18 changed files with 527336 additions and 1 deletions

View File

@@ -12,7 +12,9 @@ classifiers = [
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = []
dependencies = [
"websockets",
]
[project.optional-dependencies]
dev = [

View File

@@ -0,0 +1,27 @@
manip_new_trx:
description: Sample Manipulator X-Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-ES1-MAN:TRX
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false
manip_new_try:
description: Sample Manipulator Y-Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-ES1-MAN:TRY
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false
manip_new_trz:
description: Sample Manipulator Z - Along beam
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X10DA-ES1-MAN:TRZ
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false

View File

@@ -0,0 +1,71 @@
sample_manipulator:
- !include ./sample_manipulator.yaml
### Timepix Detector
timepix:
readoutPriority: async
description: ASI Serval Timepix Detector
deviceClass: superxas_bec.devices.timepix.timepix.Timepix
deviceConfig:
prefix: "X10DA-ES-TPX1:"
backend_rest_url: "P6-0008.psi.ch:8452"
hostname: "x10da-bec-001.psi.ch"
enable_xes: false
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: true
### Ionization Chambers
ic1:
readoutPriority: monitored
description: Ionization Chamber 1
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: X10DA-ES1-SAI_01:MEAN
auto_monitor: True
onFailure: raise
enabled: True
softwareTrigger: False
ic2:
readoutPriority: monitored
description: Ionization Chamber 2
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: X10DA-ES1-SAI_02:MEAN
auto_monitor: True
onFailure: raise
enabled: True
softwareTrigger: False
ic3:
readoutPriority: monitored
description: Ionization Chamber 3
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: X10DA-ES1-SAI_03:MEAN
auto_monitor: True
onFailure: raise
enabled: True
softwareTrigger: False
### Monochromator Axis
mono_energy:
description: Axis for the QEXAFS monochromator
deviceClass: ophyd_devices.devices.psi_motor.EpicsMotor
deviceConfig:
prefix: "X10DA-MO12-QEXAFS:E_TEST"
enabled: true
onFailure: retry
readoutPriority: baseline
softwareTrigger: false
### Trigger Card #####
trigger:
readoutPriority: baseline
description: Trigger Card
deviceClass: superxas_bec.devices.trigger.Trigger
deviceConfig:
prefix: 'X10DA-ES1:'
onFailure: raise
enabled: True
softwareTrigger: True

View File

View File

@@ -0,0 +1,13 @@
{"type": "PixelMap",
"chips": [
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]},
{"i": 0, "p": [0, 1], "f": [0.5, 0.5]},
{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}
]
}

View File

@@ -0,0 +1,933 @@
"""
TimePix Detector class for interfacing with the TimePix detector. The timepix_signals module
implements the HTTP communication to the REST API for the tpx3app app. The implementation
of the backend is stored in the timepix_fly_client module. This is combined with the control
interface in EPICS, which is implemented via the 'ASItpxCam' class.
"""
from __future__ import annotations
import enum
import os
import threading
import time
import traceback
from typing import TYPE_CHECKING, Any, Literal
import numpy as np
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from ophyd_devices import (
AsyncSignal,
CompareStatus,
DeviceStatus,
FileEventSignal,
PreviewSignal,
StatusBase,
TransitionStatus,
)
from ophyd_devices.devices.areadetector.cam import ASItpxCam
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35, ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
import superxas_bec.devices.timepix.default_pixel_maps as _default_pixel_maps
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
OtherConfigModel,
PixelMap,
)
from superxas_bec.devices.timepix.utils import AndStatusWithList
if TYPE_CHECKING:
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
logger = bec_logger.logger
# pylint: disable=redefined-outer-name
class TDCEdge(int, enum.Enum):
"""TDC Edge enum options for TimePix detector."""
RISING = 0
FALLING = 1
BOTH = 2
class TDCOuput(int, enum.Enum):
"""TDC Output enum options for TimePix detector."""
ALL_CHANNELS = 0
CHANNEL_0 = 1
CHANNEL_1 = 2
CHANNEL_2 = 3
CHANNEL_3 = 4
class ACQUIRESTATUS(int, enum.Enum):
"""Acquire status enum options for TimePix detector."""
DONE = 0
ACQUIRING = 1 # or CAPTURING
class DETECTORSTATE(int, enum.Enum):
"""Detector state enum options for TimePix detector."""
IDLE = 0
ACQUIRE = 1
READOUT = 2
CORRECT = 3
SAVING = 4
ABORTING = 5
ERROR = 6
WAITING = 7
INITIALIZING = 8
DISCONNECTED = 9
ABORTED = 10
class TRIGGERMODE(int, enum.Enum):
"""Trigger mode enum options for TimePix detector."""
INTERNAL = 0
EXTERNAL = 1
SOFTWARE = 2
class TRIGGERSOURCE(int, enum.Enum):
"""Trigger source enum options for TimePix detector."""
HDMI1_1 = 0
HDMI1_2 = 1
HDMI1_3 = 2
HDMI2_1 = 3
HDMI2_2 = 4
HDMI2_3 = 5
class EXPOSUREMODE(int, enum.Enum):
"""Exposure mode enum options for TimePix detector."""
TIMED = 0
TRIGGER_WIDTH = 1
class DATASOURCE(int, enum.Enum):
"""Data source for AD Epics backend for Timepix."""
NONE = 0
PREVIEW = 1
IMAGE = 2
class FILEWRITEMODE(int, enum.Enum):
"""HDF5 Plugin FileWrite Mode"""
SINGLE = 0
CAPTURE = 1
STREAM = 2
def load_pixel_map_from_json(file_path: str) -> PixelMap:
"""Load a pixel map from a JSON file.
Args:
file_path (str): Path to the JSON file containing the pixel map.
Returns:
PixelMap: The loaded pixel map.
"""
# Check if path exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"Pixel map file not found: {file_path}")
try:
with open(file_path, "r", encoding="utf-8") as file:
pixel_map_str = file.read()
pixel_map = PixelMap.model_validate_json(pixel_map_str)
except Exception as exc:
raise ValueError(f"Failed to load pixel map from {file_path}: {exc}") from exc
return pixel_map
class ImagePlugin_Timepix(ImagePlugin_V35):
"""Custom Image Plugin for TimePix detector."""
unique_id = Cpt(EpicsSignalRO, "UniqueId_RBV", auto_monitor=True)
class HDF5Plugin_Timepix(HDF5Plugin_V35):
"""Custom HDF5 Plugin for TimePix detector."""
capture = Cpt(EpicsSignalWithRBV, "Capture", auto_monitor=True)
write_file = Cpt(EpicsSignalWithRBV, "WriteFile", auto_monitor=True)
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-locals
class TimePixControl(ADBase):
"""Interface for the TimePix EPICS control of the TimePix detector."""
cam = Cpt(ASItpxCam, "cam1:")
image = Cpt(ImagePlugin_Timepix, "image1:")
hdf = Cpt(HDF5Plugin_Timepix, "HDF1:")
DEFAULT_PIXEL_MAP = os.path.join(
os.path.dirname(_default_pixel_maps.__file__), "timepix_8_chips_single_energy_per_chip.json"
)
DETECTOR_SHAPE = (512, 1024) # Shape of the TimePix detector
class Timepix(PSIDeviceBase, TimePixControl):
"""
TimePix class. The IOC is running with the prefix 'X10DA-ES-TPX1:'.
The TimePixFly backend is running on p4-0017.psi.ch. Please check the port from the app
running in headless server mode. The backend_rest url can for instance be 'p4-0017.psi.ch:8452'.
The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch.
"""
MIN_DETECTOR_READOUT_TIME = 2.1e-3 # Minimum readout time in seconds for ASI TimePix detector
_DETECTOR_SHAPE = DETECTOR_SHAPE
USER_ACCESS = [
"troin",
"troistep",
"get_pixel_map",
"set_pixel_map",
"set_pixel_map_from_json_file",
"set_enable_xes",
]
xes_data = Cpt(
AsyncSignal,
name="xes_data",
ndim=2,
max_size=1000,
doc="Full XES data, 2D image with energypoints vs time bins.",
)
xes_spectra = Cpt(
AsyncSignal,
name="xes_spectra",
ndim=1,
max_size=1000,
doc="1D spectra, integrated over energy bins.",
)
xes_energy_1 = Cpt(
AsyncSignal,
name="xes_energy_1",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 1.",
)
xes_energy_2 = Cpt(
AsyncSignal,
name="xes_energy_2",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
tds_period = Cpt(
AsyncSignal,
name="tds_period",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="TDS period recorded by the TimePixFly backend detector.",
)
total_periods = Cpt(
AsyncSignal,
name="total_periods",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="Total TDS periods recorded by the TimePixFly backend detector.",
)
total_events = Cpt(
AsyncSignal,
name="total_events",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="Total events recorded by the TimePixFly backend detector.",
)
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=1,
doc="Preview signal of the TimePix detector.",
)
static_spectra = Cpt(
AsyncSignal,
name="static_spectra",
ndim=1,
max_size=1000,
acquisition_group="monitored",
async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]},
doc="Spectra signal of the TimePix detector.",
)
xes_data_accumulated_1 = Cpt(
AsyncSignal,
name="xes_accumulated_energy_1",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
xes_data_accumulated_2 = Cpt(
AsyncSignal,
name="xes_accumulated_energy_2",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
file_event = Cpt(
FileEventSignal, name="file_event", doc="File event signal for TimePix detector."
)
def __init__(
self,
*,
name,
prefix: str,
backend_rest_url: str,
hostname: str | None = None,
socket_port: int = 0,
enable_xes: bool = True,
scan_info=None,
device_manager=None,
**kwargs,
):
"""
Initialize the Timepix detector.
Args:
name (str): Name of the device.
prefix (str): EPICS prefix for the device.
backend_rest_url (str): URL of the TimePixFly backend REST API.
hostname (str | None): Hostname of the machine running the backend. Defaults to None
which will use the current machine's hostname.
socket_port (int): Port for the socket connection to the backend. Defaults to 0
which will use the default port from the backend.
enable_xes (bool): Whether to enable XES data acquisition with TimePixFly backend is active. Defaults to True.
scan_info: Scan information object, if available.
device_manager: Device manager instance, if available.
**kwargs: Additional keyword arguments for the base class.
"""
self.backend = TimepixFlyBackend(
backend_rest_url=backend_rest_url, hostname=hostname, socket_port=socket_port
)
self._pixel_map = None
self._troistep = 1
self._troin = 5000
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
# Image poll rate for preview updates in Hz (max 5 Hz to limit throughput)
self._poll_rate = 10
self._enable_xes = enable_xes
self._full_path = ""
self._n_images = 0
self._unique_array_id = 0
self._pv_timeout = 5
self._readout_time = self.MIN_DETECTOR_READOUT_TIME
self.r_lock = threading.RLock() # Lock to access the message buffer safely
self.accumulated_data_e1 = None
self.accumulated_data_e2 = None
def stage(self) -> list[object] | StatusBase: # type: ignore
"""Stage the device.
Super stage not safe to call.."""
self.stopped = False
status = self.on_stage() # pylint: disable=assignment-from-no-return
if isinstance(status, StatusBase):
return status
return []
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
width = self.image.array_size.width.get()
height = self.image.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
logger.info(f"Setting preview data for {self.name} with shape {data.shape}")
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
###
def msg_buffer_callback(
self,
start_frame: dict[
Literal[
"type", "Mode", "TRoiStart", "TRoiStep", "TRoiN", "NumEnergyPoints", "save_interval"
],
Any,
],
data_frames: list[
dict[
Literal["type", "period", "totalEvents", "TDSpectra", "beforeROI", "afterROI"], Any
]
],
end_frame: dict[Literal["type", "error"], Any],
):
"""
Callback method to be attached to the backend to process the message buffer. The callback expects
start_frame, data_frames, and end_frame as arguments. Additionally, one may pass extra kwargs that
will be passed to the callback function.
Args:
start_frame (dict): The StartFrame. Dictionary representation of detailed structure
described in model .timepix_fly_client.timepix_fly_interface.TimepixStartFrame
data_frames (list): List of XesData frames. Dictionary of structures described in
model .timepix_fly_client.timepix_fly_interface.TimepixDataFrame
end_frame (dict): The EndFrame. Dictionary representation of detailed structure
described in model .timepix_fly_client.timepix_fly_interface.TimepixEndFrame
"""
n_energy_points = start_frame.get("NumEnergyPoints", None)
troin = start_frame["TRoiN"]
if troin != self._troin:
logger.error(f"Number of pixels {troin} does not match expected {self._troin}.")
# Create return data
xes_data = np.zeros((n_energy_points, troin), dtype=np.float32) # dtype from backend code
tds_period = []
tds_total_events = 0
total_periods = 0
data_frame_freq = 131000 / start_frame.get("save_interval", 1) # in Hz
logger.info(
f"Processing TimepixFly data: start_frame: {start_frame}, end_frame: {end_frame}"
)
if len(data_frames) == 0:
logger.error(
f"No data frames received in msg_buffer; for start_frame: {start_frame}, end_frame: {end_frame}"
)
# TODO this should no longer happen now as this was fixed in the backend..
else:
for msg in data_frames:
tds_period.append(msg["period"])
tds_total_events += msg["totalEvents"]
for ii in range(n_energy_points):
xes_data[ii, :] += msg["TDSpectra"][ii::n_energy_points]
tds_period = (
np.array(tds_period) / start_frame.get("save_interval", 1) / data_frame_freq
)
total_periods = end_frame.get("periods", None)
if total_periods is not None:
self.total_periods.put(
total_periods / start_frame.get("save_interval", 1) / data_frame_freq
)
else:
logger.error(f"Received total_periods: {total_periods} from end_frame {end_frame}.")
# Transpose to get shape (troin, n_energy_points)
xes_data = xes_data.T
# Put XES data
self.tds_period.put(tds_period)
self.total_events.put(tds_total_events)
self.xes_data.put(
xes_data, async_update={"type": "add", "max_shape": [None, troin, n_energy_points]}
)
if n_energy_points == 8:
data_1 = np.sum(xes_data[:, 0:4], axis=1)
data_2 = np.sum(xes_data[:, 4:8], axis=1)
self.xes_energy_1.put(data_1, async_update={"type": "add", "max_shape": [None, troin]})
self.xes_energy_2.put(data_2, async_update={"type": "add", "max_shape": [None, troin]})
if self.accumulated_data_e1 is None:
self.accumulated_data_e1 = data_1
else:
self.accumulated_data_e1 += data_1
if self.accumulated_data_e2 is None:
self.accumulated_data_e2 = data_2
else:
self.accumulated_data_e2 += data_2
self.xes_data_accumulated_1.put(
self.accumulated_data_e1,
async_update={"type": "replace", "max_shape": [None, troin]},
)
self.xes_data_accumulated_2.put(
self.accumulated_data_e2,
async_update={"type": "replace", "max_shape": [None, troin]},
)
self.xes_spectra.put(
xes_data.sum(axis=1), async_update={"type": "add", "max_shape": [None, troin]}
)
logger.debug(f"Device data set for Timepix with {tds_period}, {tds_total_events}")
### User ACCESS methods
def get_pixel_map(self) -> dict:
"""Get the current pixel map as a dictionary."""
return self._pixel_map.model_dump()
def set_pixel_map(self, pixel_map: dict) -> None:
"""Set the pixel map from a dictionary."""
self._pixel_map = PixelMap.model_validate(pixel_map)
def set_pixel_map_from_json_file(self, file_path: str) -> None:
"""Set the pixel map from a JSON file.
Args:
file_path (str): Path to the JSON file containing the pixel map.
"""
pixel_map = load_pixel_map_from_json(file_path)
self._pixel_map = pixel_map
def set_enable_xes(self, enable: bool) -> None:
"""Enable or disable XES data acquisition.
Args:
enable (bool): Whether to enable XES data acquisition.
"""
self.enable_xes = enable
@property
def enable_xes(self) -> bool:
"""Get whether XES data acquisition is enabled."""
return self._enable_xes
@enable_xes.setter
@typechecked
def enable_xes(self, value: bool):
"""Set whether XES data acquisition is enabled."""
self._enable_xes = value
self._enable_xes_settings(value)
# #TODO Update device manager config if available
# if self.device_manager is not None:
# dev_obj = self.device_manager.devices.get(self.name, None)
# if dev_obj is not None:
# cfg = dev_obj.get_device_config()
# if "enable_xes" in cfg and cfg["enable_xes"] != value:
# cfg["enable_xes"] = value
# dev_obj.set_device_config({"enable_xes": value})
# logger.info(
# f"Updated 'enable_xes' to {value} in device manager for {self.name}"
# )
@property
def pixel_map(self) -> PixelMap:
"""Get the current pixel map of the TimePix detector."""
if self._pixel_map is None:
try:
pixel_map = load_pixel_map_from_json(DEFAULT_PIXEL_MAP)
self._pixel_map = pixel_map
# pylint: disable=broad-except
# pylint: disable=raise-missing-from
except Exception:
content = traceback.format_exc()
logger.error(f"Failed to load default pixel map: {content}")
raise ValueError(
f"Failed to load default pixel map from {DEFAULT_PIXEL_MAP}: {content}"
)
return self._pixel_map
@pixel_map.setter
@typechecked
def pixel_map(self, value: PixelMap):
self._pixel_map = value
@property
def troistep(self) -> int:
"""Get the current ROI step size."""
return self._troistep
@troistep.setter
@typechecked
def troistep(self, value: int):
"""Set the ROI step size."""
if value <= 0:
raise ValueError("ROI step size must be a positive integer.")
self._troistep = value
@property
def troin(self) -> int:
"""Get the current ROI number of pixels."""
return self._troin
@troin.setter
@typechecked
def troin(self, value: int):
"""Set the ROI number of pixels."""
if value <= 0:
raise ValueError("ROI number of pixels must be a positive integer.")
self._troin = value
######################################################################
### Beamline specific methods for the TimePix Detector integration ###
######################################################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
start_time = time.time()
logger.info(f"Loading default pixel map for TimePix detector {self.name}...")
self.set_pixel_map_from_json_file(DEFAULT_PIXEL_MAP)
logger.info(
f"Default pixel map for TimePix detector {self.name} loaded after {time.time() - start_time:.3f} seconds."
)
def _enable_xes_settings(self, enabled: bool) -> None:
"""Enable XES specific settings for the TimePix detector."""
enabled_value = 1 if enabled else 0
self.cam.tdc1_enable.set(enabled_value).wait(timeout=self._pv_timeout)
self.cam.tdc2_enable.set(enabled_value).wait(timeout=self._pv_timeout)
self.cam.raw_enable.set(enabled_value).wait(timeout=self._pv_timeout)
if enabled:
self.cam.tdc1_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
self.cam.tdc1_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
start_time = time.time()
logger.info(f"On Connected of TimePix detector {self.name}...")
# Prepare TimePix Detector
self._enable_xes_settings(self.enable_xes)
self.cam.trigger_mode.set(TRIGGERMODE.INTERNAL).wait(timeout=self._pv_timeout)
self.cam.trigger_source.set(TRIGGERSOURCE.HDMI1_1).wait(timeout=self._pv_timeout)
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
# Reset array counter on connect
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
# ------------------
# Prepare file writing through AD HDF5 plugin
# -----------------
self.hdf.enable.set(1).wait(timeout=self._pv_timeout)
self.hdf.file_write_mode.set(FILEWRITEMODE.STREAM.value).wait(timeout=self._pv_timeout)
self.hdf.auto_save.set(1).wait(timeout=self._pv_timeout)
self.hdf.file_template.set("%s%s").wait(timeout=self._pv_timeout)
self.hdf.lazy_open.set(1).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
# ------------------
# Prepare TimePixFly backend
# -----------------
# Prepare backend for TimePixFly
self.backend.on_connected()
# Register the callback for processing data received by the backend
self.backend.add_callback(self.msg_buffer_callback)
self._poll_thread.start()
logger.info(
f"TimePix detector {self.name} connected and initialized after {time.time() - start_time:.3f} seconds."
)
# Subscribe to new image updates
self.image.unique_id.subscribe(self._on_new_image_received)
def _on_new_image_received(self, value: int, old_value: int, **kwargs):
"""Callback for image unique ID updates to trigger preview update."""
if value == old_value:
return # No new image, or counter reset
try:
# Get new image data
width = self.image.array_size.width.get()
height = self.image.array_size.height.get()
array_data = self.image.array_data.get()
if array_data is None:
logger.info(f"No image data available for preview of {self.name}")
return
# Geometry correction for the image
data = np.sum(np.reshape(array_data, (height, width)), axis=1)
self.static_spectra.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error while updating preview for {self.name} on image update: {content}")
def on_stage(self) -> StatusBase | None:
"""Called while staging the device."""
self.accumulated_data_e1 = None
self.accumulated_data_e2 = None
scan_msg: ScanStatusMessage = self.scan_info.msg # type: ignore
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time - self._readout_time <= 0:
raise ValueError(
f"Exposure time {exp_time} must be greater than readout time {self._readout_time}."
)
burst_images = scan_msg.scan_parameters.get("frames_per_trigger", 1)
self._n_images = scan_msg.num_points * burst_images
# Camera has to be set to burst_images, each step will get an individual trigger
self.cam.acquire_time.set(exp_time - self._readout_time).wait(timeout=self._pv_timeout)
self.cam.acquire_period.set(exp_time).wait(timeout=self._pv_timeout)
self.cam.num_images.set(burst_images).wait(timeout=self._pv_timeout)
self.cam.data_source.set(DATASOURCE.IMAGE).wait(timeout=self._pv_timeout)
# Setup file writing
self._full_path = get_full_path(scan_msg, name="timepix")
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
# Setup file writing for the total expected number of images
self.hdf.num_capture.set(self._n_images).wait(5)
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
# -------------------------
# XES specific staging
if self.enable_xes:
# Prepare TimePixFly
other_config = OtherConfigModel(
TRoiStep=self.troistep,
TRoiN=self.troin,
output_uri=f"tcp:{self.backend.hostname}:{self.backend.socket_port}",
save_interval=int(131000 / 5) - 5, # Save interval in 131kHz units,
)
logger.debug(f"Current TimePixFly configuration: {other_config}")
pixel_map = self.pixel_map
self.backend.on_stage(other_config=other_config, pixel_map=pixel_map)
# Fetch the backend socket info
net_add = self.backend.timepix_fly_client.get_net_addresses()
logger.debug(f"Using net_add for timepix_fly backend {net_add}")
self.cam.raw_file_template.set("").wait(timeout=self._pv_timeout)
self.cam.raw_file_path.set(f"tcp://connect@{net_add.address}").wait(
timeout=self._pv_timeout
)
def on_unstage(self) -> None:
"""Called while unstaging the device."""
# TODO what should happen for unstage? Make sure that acquisition is not running?
# self.backend.on_unstage()
# self.cam.acquire.put(0)
# status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
def on_pre_scan(self) -> StatusBase:
"""Called right before the scan starts on all devices automatically."""
status_camera = CompareStatus(
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
)
status_writer = CompareStatus(
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
)
status = status_camera & status_writer
self.cancel_on_stop(status)
return status
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
status_camera = TransitionStatus(
self.cam.acquire_busy, [ACQUIRESTATUS.DONE, ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE]
)
img_counter = self.hdf.num_captured.get()
status_backend = None
# First we make sure that the backend reach 'config' state. This needs to happend before each trigger.
if self.enable_xes is True:
status_backend_config = DeviceStatus(self)
self.cancel_on_stop(status_backend_config)
self.backend.timepix_fly_client.add_status_callback(
status=status_backend_config,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
try:
status_backend_config.wait(timeout=5)
except TimeoutError:
# pylint:disable=raise-missing-from
raise TimeoutError(
f"TimePixFly backend of device {self.name} failed to reach 'config' state in trigger"
)
# Prepare backend to be ready to receive trigger
status = self.backend.on_trigger()
try:
status.wait(timeout=5) # Wait until backend trigger is done
# pylint:disable=raise-missing-from
except Exception:
logger.error(
f"TimePixFly backend of device {self.name} failed to prepare for trigger"
)
raise TimeoutError(
f"TimePixFly backend of device {self.name} failed to reach 'trigger' state in trigger"
)
# Status that resolves once the trigger is done
status_backend = self.backend.on_trigger_finished()
if status_backend is not None:
return_status = status_camera & status_backend
else:
return_status = status_camera
self.cancel_on_stop(return_status)
self.cam.acquire.put(1)
return return_status
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
# Status Camera
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
# Status Writer
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images
# Status Backend
status_backend = None
if self.enable_xes is True:
# Add callback to the backend complete handling
status_backend = self.backend.on_complete(status=status_backend)
# Combine the statuses
if status_backend is not None:
return_status = status_backend & status_camera & status_writer
else:
return_status = status_camera & status_writer
return_status.add_callback(self._complete_callback)
self.cancel_on_stop(return_status)
return return_status
def _complete_callback(self, status: CompareStatus) -> None:
"""Callback for when the device completes a scan."""
if status.success:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
done=True,
successful=True,
hinted_h5_entries={"data": "/entry/data/data"},
)
else:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
done=True,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
def on_stop(self) -> None:
"""Called when the device is stopped."""
# Camera
self.cam.acquire.put(0)
self.hdf.capture.put(0)
# Backend
if self.enable_xes is True:
self.backend.on_stop()
def on_destroy(self):
"""Cleanup method to stop the device and clean up resources."""
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
# pylint: disable=protected-access
if __name__ == "__main__": # pragma: no cover
timepix = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="P6-0008.psi.ch:8452", # "P4-0017.psi.ch:8452",
hostname="x10da-bec-001.psi.ch",
)
try:
# timepix.wait_for_connection(all_signals=True, timeout=10)
timepix.on_connected()
print("Timepix connected and initialized.")
for exp_time, frames_per_trigger, runs in zip([0.1, 1, 0.2], [20, 5, 1], [10, 5, 30]):
print(
f"Sleeping for 0.5 seconds before starting the scan with exp_time={exp_time} "
f"and frames_per_trigger={frames_per_trigger}. and runs {runs}"
)
time.sleep(0.5)
timepix.scan_info.msg.scan_parameters.update(
{
"exp_time": exp_time, # Set exposure time to 1 second for testing
"frames_per_trigger": frames_per_trigger, # Set frames per trigger to 5 for testing
}
)
timepix.stage()
logger.warning(f"Timepix on stage done")
timepix.pre_scan()
logger.warning(f"Timepix on pre_scan done")
msgs = []
# for ii in range(runs):
for run in range(runs):
logger.warning(f"Starting trigger run {run + 1}/{runs}")
status = timepix.trigger()
logger.warning(f"Timepix triggered")
start_time = time.time()
while not status.done:
try:
status.wait(timeout=1)
except Exception as exc:
logger.warning(f"Trigger not done after ({time.time() - start_time:.2f}s)")
if time.time() - start_time > 20:
logger.warning("Breaking loop manually after 20 seconds of waiting.")
status.set_exception(f"Failed to complete trigger after 20 seconds")
break
if hasattr(timepix, "_msg_dump"):
n_messages = len(timepix._msg_dump)
logger.warning(f"Messages in Buffer is {n_messages}")
if n_messages > 0:
msg = timepix._msg_dump[-1]
logger.warning(
f"Last message had N start_frame : {msg.get('start_frame')}, N data_frames: {len(msg.get('data_frame'))}, N end_frame : {msg.get('end_frame')}"
)
status = timepix.complete()
print("Waiting for timepix to complete.")
status.wait(timeout=10)
print("Timepix scan completed.")
timepix.unstage()
# timepix._msg_dump.clear()
print("Timepix unstaged.")
except Exception as e:
content = traceback.format_exc()
logger.error(f"An error occurred: {content}")
finally:
timepix.destroy()
print("Timepix destroyed.")

View File

@@ -0,0 +1 @@
from .timepix_fly_backend import TimepixFlyBackend

View File

@@ -0,0 +1 @@
from .timepix_fly_mock_server import TimePixFlyMockServer

View File

@@ -0,0 +1,49 @@
"""Module to control the Timepix Fly mock server."""
import requests
class TimePixFlyMockServer:
"""
A mock server for the Timepix Fly detector that simulates the behavior of the actual server.
This is used for testing purposes and does not require a real Timepix Fly detector.
"""
def __init__(self, host: str = "localhost", port: int = 8080, logger=None):
"""
Initialize the TimePixFlyMockServer with a host and port.
Args:
host (str): The host address for the mock server. Default is "localhost".
port (int): The port number for the mock server. Default is 8080.
logger: An optional logger to log messages. If not provided, messages will be printed to the console.
"""
self.host = host
self.port = port
self.logger = logger
def add_log(self, message: str) -> None:
"""
Add a log message to the logger if available.
If no logger is provided, it will print the message to the console.
Args:
message (str): The message to log.
"""
if self.logger is not None:
self.logger.info(message)
else:
print(message)
def start_acquisition(self):
"""
Simulate starting an acquisition on the Timepix Fly detector.
This method does not perform any real acquisition but simulates the behavior.
"""
try:
requests.get(f"http://{self.host}:{self.port}/measurement/start", timeout=0.2)
except requests.exceptions.RequestException:
self.add_log("Failed to start acquisition on Timepix Fly mock server.")
# Ignore all exceptions as there is currently no return value for the request
else:
self.add_log("Acquisition started on Timepix Fly mock server.")

View File

@@ -0,0 +1,559 @@
"""
Implementation of the Timepix Fly Backend. It handles the communication
with the TimepixFly backend (https://github.com/paulscherrerinstitute/TimePixFly).
Please be aware that this was developed agains the 'dev' branch (2025/08/15).
It communicates with the backend through a simple Client (TimepixFlyClient)
that handles the REST and WebSocket communication + callbacks, and provides
hooks for all the relevant ophyd interface, 'on_stage',
'on_trigger', 'on_complete', 'on_stop', ...
"""
from __future__ import annotations
import json
import signal
import socket
import threading
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Callable, Tuple
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimepixFlyClient,
TimePixFlyStatus,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
OtherConfigModel,
PixelMap,
)
if TYPE_CHECKING:
from ophyd import DeviceStatus
from superxas_bec.devices.timepix.timepix import Timepix
logger = bec_logger.logger
# pylint: disable=line-too-long
# pylint: disable=redefined-outer-name
class TimepixFlyBackendException(Exception):
"""Custom exception for Timepix Fly Backend errors."""
class TimepixFlyBackend:
"""Timepix Fly Backend Device."""
def __init__(self, backend_rest_url: str, hostname: str | None = None, socket_port: int = 0):
"""
Initialize the Timepix Fly Backend device.
Parameters:
backend_rest_url: The REST URL of the backend.
hostname: The hostname of the device, defaults to None, which means
socket.getfqdn() will be used to fetch hostname. It is recommended to specify
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch' for use
at the beamline computers of SLS, or localhost for local testing of the backend.
socket_port: The socket port to use. Defaults to 0,
which lets the OS choose an available port.
"""
ws_url = f"{backend_rest_url}/ws"
self.timepix_fly_client = TimepixFlyClient(rest_url=backend_rest_url, ws_url=ws_url)
if hostname is None:
hostname = socket.getfqdn()
self.hostname = hostname
self.socket_port = socket_port # Use 0 as default to let the OS choose an available port
self.__msg_buffer = []
self.callbacks: dict[str, Tuple[Callable[[dict, list[dict], dict, dict], None], dict]] = {}
self._status_objects: list[StatusBase] = []
self._decoder = json.JSONDecoder()
self._socket_server: socket.socket | None = None
self._data_thread: threading.Thread | None = None
self._data_thread_shutdown_event = threading.Event()
###################################################
###### Hooks for the PSIDeviceBase interface ######
###################################################
def on_connected(self):
"""Called if it is ensured that the device is connected."""
time_started = time.time()
logger.info("Connecting to Timepix Fly backend...")
try:
self.timepix_fly_client.on_connected()
status = self.start_data_server()
status.wait(timeout=5)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
# pylint: disable=raise-missing-from
raise TimepixFlyBackendException(
f"Could not start data server on {self.hostname}:{self.socket_port}. Please check logs for more details."
)
logger.info(
f"Timepix Fly backend connected and data server started on {self.hostname}:{self.socket_port} after {time.time() - time_started:.3f} seconds."
)
def on_stage(self, other_config: OtherConfigModel, pixel_map: PixelMap):
"""
Hook for on stage logic.
Args:
other_config (OtherConfigModel): The configuration for the Timepix Fly detector.
pixel_map (PixelMap): The pixel map for the Timepix Fly detector.
"""
time_started = time.time()
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
try:
status.wait(timeout=5.0)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while waiting for Timepix Fly backend to be in config state: {content}"
)
# pylint: disable=raise-missing-from
raise TimeoutError(
f"Timepix Fly backend state did not reach config state, running into timeout. Error traceback {content}."
)
status = StatusBase()
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
logger.debug(f"Setting other config, backend {other_config}")
self.timepix_fly_client.set_other_config(other_config)
self.timepix_fly_client.set_pixel_map(pixel_map)
try: # TODO make asynchronous
status.wait(timeout=5.0)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while waiting for Timepix Fly backend to be in config state after setting config: {content}"
)
# pylint: disable=raise-missing-from
raise TimeoutError(
f"Timepix Fly backend state did not reach config state after setting config, running into timeout. Error traceback {content}."
)
logger.info(f"TimePixFly backend staged after {time.time() - time_started:.3f} seconds.")
def on_trigger(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
"""
Hook for on_trigger logic. It adds a status callback based on the TimePixFlyStatus.
The backend needs to get into the AWAIT_CONNECTION state before starting the acquisition.
Args:
status (StatusBase | DeviceStatus | None): The status object to track the operation.
If None, a new StatusBase object will be created.
Returns:
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
"""
# TODO, could be removed as it's checkd from the top level!
if status is None:
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.AWAIT_CONNECTION],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
self.timepix_fly_client.start()
return status
def on_trigger_finished(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
"""
Hook for on_trigger_finished logic. It adds a status callback based on the TimePixFlyStatus.
The backend needs to get into the CONFIG state again after a trigger is finished.
In practice, a full scan logic is happening during on trigger.
The status will be marked as finished/successful when the backend state
reaches CONFIG. If an exception state is reached, the status will be marked as failed.
Args:
status (StatusBase | DeviceStatus | None): The status object to track the operation.
If None, a new StatusBase object will be created.
Returns:
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
"""
if status is None:
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
return status
def on_complete(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
"""
Hook for on_complete logic. It adds a status callback based on the TimePixFlyStatus.
The backend needs to get into the CONFIG state after a single acquisition.
Args:
status (StatusBase | DeviceStatus | None): The status object to track the operation.
If None, a new StatusBase object will be created.
Returns:
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
"""
if status is None:
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
return status
def on_unstage(self) -> StatusBase:
"""Hook for on_unstage logic."""
# status = StatusBase()
# self.cancel_on_stop(status)
# self.timepix_fly_client.add_status_callback(
# status,
# success=[TimePixFlyStatus.CONFIG],
# error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
# )
# return status
def on_destroy(self):
"""Hook for on_destroy logic."""
self.timepix_fly_client.shutdown()
self._data_thread_shutdown_event.set()
if self._data_thread is not None and self._data_thread.is_alive():
self._data_thread.join(timeout=1) # Allow the data thread to finish
if self._data_thread.is_alive():
logger.error(
"Data thread poll loop of timepix_fly_backend did not stop within 1 second."
)
if self._socket_server is not None:
try:
logger.info(f"Closing socket server on {self.hostname}:{self.socket_port}.")
self._socket_server.close()
# pylint: disable=broad-except
except Exception:
content = traceback.format_exc()
logger.error(f"Error closing socket server: {content}")
def on_stop(self):
"""Hook for on_stop logic."""
self.stop_all_status_objects()
self.timepix_fly_client.stop_running_collection()
####################################################
########## Custom Methods for the Backend ##########
####################################################
def cancel_on_stop(self, status: StatusBase):
"""Cancel ongoing operations of a status object when the sto method is called."""
self._status_objects.append(status)
def stop_all_status_objects(self):
"""Stop all status objects that are currently running."""
for status in self._status_objects:
with status._lock:
if not status.done:
status.set_exception(
RuntimeError("Stop called on device, all status objects cancelled.")
)
logger.info(f"Cancelled status object: {status}")
self._status_objects.clear()
def add_callback(self, callback: callable, kwd: dict | None = None) -> str:
"""
Add a callback that will be executed whenever an acquisition is completed. This is
determind by receiving an EndFrame message from the backend. There will always be
a StartFrame message, follow by optional DataFrame messages, and finally
an EndFrame message. The callback will be called with the StartFrame, all DataFrames
and the EndFrame message as arguments, along with any additional keyword arguments
provided when registering the callback.
The callback signature needs to be:
def callback(start_frame: dict, data_frames: list[dict], end_frame: dict, kwd) -> None:
Args:
- start_frame (dict): The first message received, typically containing metadata.
- data_frames (list[dict]): A list of all data frames received during the acquisition.
- end_frame (dict): The last message received, typically containing the EndFrame type.
- any additional keyword arguments provided when registering the callback.
Args:
callback (callable): The callback function to be called. The callback signature should be:
kwd (dict | None): Additional keyword arguments to pass to the callback, they will be unpacked
when calling the callback. If None, an empty dictionary will be used.
Returns:
str: A unique identifier for the callback.
"""
if kwd is None:
kwd = {}
cb_id = uuid.uuid4()
self.callbacks[cb_id] = (callback, kwd)
logger.info(f"Callback {callback.__name__} added with UUID {cb_id}.")
return str(cb_id)
def remove_callback(self, cb_id: str):
"""
Remove a callback by its unique identifier.
Args:
cb_id (str): The unique identifier of the callback to remove.
"""
if cb_id in self.callbacks:
self.callbacks.pop(cb_id)
logger.info(f"Callback with UUID {cb_id} removed.")
else:
logger.warning(f"Callback with UUID {cb_id} not found.")
def start_data_server(self) -> StatusBase:
"""
Start the data server to receive data from the Timepix Fly backend over a socket connection.
It will try to decypher the hostname through socket.getaddrinfo, and if multiple addresses
are found, it will use the first one. Please note that depending on the network configuration,
the hostname might not have the correct domain name attached, so it is recommended to specify
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch'.
The method creates a socket server that listens for incoming connections on the specified
hostname and port. It starts a thread that continuously receives data from the socket,
decodes the received JSON data, and processes it. The data is expected to be in JSON format,
with each message ending with a trailing byte "}\n".
Returns:
StatusBase: A status object that indicates if the data server thread is ready to accept
connections. High level implementation should ensure that the data server is
started (status.wait(timeout=4)) before any data is sent from the backend.
"""
info = socket.getaddrinfo(
self.hostname, port=self.socket_port, family=socket.AF_INET, type=socket.SOCK_STREAM
)
if len(info) == 0:
raise RuntimeError(f"Could not resolve hostname {self.hostname} for socket server.")
if len(info) > 1:
logger.info(
f"Multiple addresses found for {self.hostname}. Using the first one: {info[0]}"
)
family, socktype, proto, _, sockaddr = info[0]
self._socket_server = socket.create_server(sockaddr, family=family, backlog=1)
self._socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set the hostname and socket_port to the ones that was picked by the socket.getaddrinfo
self.hostname, self.socket_port = self._socket_server.getsockname()
logger.info(
f"Socket server started on {self.hostname}:{self.socket_port}. Waiting for connections."
)
# Create status object to return for the high level implementations
status = StatusBase()
if self._data_thread is None or not self._data_thread.is_alive():
self._data_thread_shutdown_event.clear()
self._data_thread = threading.Thread(
target=self._receive_data_on_socket, kwargs={"status": status}
)
self._data_thread.start()
else:
raise TimepixFlyBackendException(
"Data server thread is already running on timepix_fly_backend."
)
return status
def _receive_data_on_socket(self, status: StatusBase):
"""
Background loop running in a thread, that receives data from the
timepix fly backend over socket_server. The backend reconnects for every acquisition (trigger),
to this socket. Therefore, it is important to handle all connections and disconnections properly.
The buffer variable stores a string stream of received data. Whenever a trailing byte "}\n" is found,
in the buffer, the buffer is split into chunks of received data and each chunk is decoded
as a JSON object. The decoded objects are then processed, and if an EndFrame message is received,
the registered callbacks are executed with the StartFrame, all DataFrames, and the EndFrame message.
"""
buffer = ""
self._socket_server.settimeout(
0.1
) # Set short socket timeout to avoid blocking the thread loop
status.set_finished() # Indicate that the socket server is ready to accept connections
while not self._data_thread_shutdown_event.is_set(): # Shutdown event
try:
# blocks until connected or timeout reached
conn, addr = self._socket_server.accept()
except socket.timeout:
continue # Timeout is okay, continue
except Exception: # pylint: disable=broad-except
# Log error, check if shutdown event is set.
# Shutdown event should be set before socket_server.close() is called.
content = traceback.format_exc()
logger.error(f"Error accepting connection: {content}")
continue
logger.debug(f"Connection accepted from {addr} for timepix_fly backend.")
# Clear the message buffer before entering the loop.
if self.__msg_buffer:
logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}")
self.__msg_buffer.clear()
conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv
with conn:
while not self._data_thread_shutdown_event.is_set():
try:
# What if we split the chunk
chunk = conn.recv(4096) # Adjust buffer size as needed
except socket.timeout:
# Timeout is okay, continue in loop
continue
except Exception as e: # pylint: disable=broad-except
logger.error(f"Connection error: {e}. Closing connection.")
# conn = None #TODO should we reset conn?
break
if not chunk:
# Receiving an empty chunk means the connection was closed
# conn = None #TODO should we reset conn?
break
buffer += chunk.decode("utf-8")
# Check if trailing byte "}\n" present in buffer
buffer_chunks = buffer.split("}\n")
for entry in buffer_chunks[:-1]:
# Process all complete JSON objects in the buffer
self._decode_received_data(entry + "}")
# Keep the last incomplete chunk.
# If the buffer ended with "}\n", this will be an empty string.
buffer = buffer_chunks[-1]
def _decode_received_data(self, buffer: str) -> None:
"""
Decode the received data from the socket.
Args:
buffer (str): The JSON string received from the socket.
"""
try:
obj, _ = self._decoder.raw_decode(buffer)
except json.JSONDecodeError:
logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}")
return # TODO should this raise, or only log error as of now?
self.__msg_buffer.append(obj)
if obj.get("type", "") == "EndFrame":
try:
# If the EndFrame message is received, run the callbacks
logger.debug(f"Running callbacks")
self.run_msg_callbacks()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error in msg callbacks with error msg: {content}")
msgs_in_buffer = "".join(
[f"{msg['type']} with keys {msg.keys()} \n" for msg in self.__msg_buffer]
)
logger.debug(f"TimePixFlyBackend: Messages in buffer: {msgs_in_buffer}")
finally: # Make sure to always reset the message buffer after processing
logger.debug(
"TimePixFlyBackend: Resetting message buffer after processing EndFrame message."
)
logger.debug(f"Messages in buffer: {len(self.__msg_buffer)}")
self.__msg_buffer.clear()
def run_msg_callbacks(self):
"""Run callbacks if EndFrame message is received."""
start_frame = self.__msg_buffer[0]
end_frame = self.__msg_buffer[-1]
data_frames = self.__msg_buffer[1:-1]
for cb, kwd in self.callbacks.values():
try:
cb(start_frame, data_frames, end_frame, **kwd)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error in callback with error msg: {content}")
if __name__ == "__main__": # pragma: no cover
import time
from superxas_bec.devices.timepix.timepix_fly_client.test_utils.timepix_fly_mock_server import (
TimePixFlyMockServer,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
TimepixEndFrame,
TimepixStartFrame,
TimepixXESFrame,
)
mock_server = TimePixFlyMockServer()
timepix = TimepixFlyBackend(
backend_rest_url="localhost:8452", hostname="localhost", socket_port=3031
)
start_frames = {}
xes_frames = {}
end_frames = {}
def add_msg_callback(start_frame, data_frames, end_frame, **kwargs):
"""Callback to print received messages."""
counter = len(start_frames)
start_frames[counter] = TimepixStartFrame(**start_frame)
xes_frames[counter] = [TimepixXESFrame(**data_frame) for data_frame in data_frames]
end_frames[counter] = TimepixEndFrame(**end_frame)
try:
print("TimepixFlyBackend initialized.")
timepix.on_connected()
print("TimepixFlyBackend connected.")
# Parse scan info for OtherConfig
config = OtherConfigModel(
output_uri=f"tcp:{timepix.hostname}:{timepix.socket_port}", TRoiStep=1, TRoiN=5000
)
# Parse pixel map from scan info if needed, otherwise use some default pixel map.
pixel_map = PixelMap(
chips=[
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
]
)
timepix.add_callback(add_msg_callback)
timepix.on_stage(other_config=config, pixel_map=pixel_map)
print("TimepixFlyBackend staged with configuration and pixel map.")
for ii in range(5):
print(f"Starting scan {ii + 1}...;")
time.sleep(1)
status_1 = timepix.on_trigger()
# print("TimepixFlyBackend pre-scan started.")
status_1.wait(timeout=10)
mock_server.start_acquisition()
status_2 = timepix.on_trigger_finished()
status_2.wait(timeout=10)
# print("Acquisition started on mock server.")
print("TimepixFlyBackend scan completed.")
status = timepix.on_complete()
status.wait(timeout=10)
print(
f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames."
)
# pylint: disable=broad-except
except Exception as e:
logger.error(f"Error during TimepixFlyBackend operation: {e}")
finally:
timepix.on_destroy()
print("TimepixFlyBackend destroyed.")

View File

@@ -0,0 +1,497 @@
"""
Module that implements a python client interface to the TimePix Fly tpx3app REST API,
and connects to the TimePix Fly WebSocket server to receive status updates.
It provides methods to start, stop, and configure the TimePix detector,
as well as to retrieve pixel maps, other configuration parameters, and
the current state of the detector.
"""
from __future__ import annotations
import enum
import threading
import time
import traceback
from typing import Any, Type
import requests
from bec_lib.logger import bec_logger
from ophyd import StatusBase
from websockets import State
from websockets.exceptions import WebSocketException
from websockets.sync.client import ClientConnection, connect
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
LastError,
NetAddresses,
OtherConfigModel,
PixelMap,
PixelMapFromFile,
ProgramState,
TimePixResponse,
Version,
)
logger = bec_logger.logger
# pylint: disable=line-too-long
class TimePixStatusError(Exception):
"""Exception raised when the TimePix detector status was in an unexpected state."""
class TimePixFlyStatus(str, enum.Enum):
"""
Enum representing the status of the TimePix detector.
"""
INIT = "init"
CONFIG = "config"
SETUP = "setup"
COLLECT = "collect"
SHUTDOWN = "shutdown"
UNDEFINED = "undefined"
AWAIT_CONNECTION = "await_connection"
EXCEPT = "except"
class TimepixFlyClient:
"""
A client for the TimePix fly backend (tpx3app).
It exposes methods to interact with REST endpoints
and allows to connect callbacks to status objects from ophyd
that allow to dynamically update based on the state of the backend.
"""
def __init__(self, rest_url: str, ws_url: str):
"""
Initialize the TimePixFlyClient with a server address.
Args:
rest_url (str): The REST API URL for the TimePix Fly backend, e.g., "localhost:8452".
ws_url (str): The WebSocket URL for the TimePix Fly backend, e.g., "localhost:8452/ws".
"""
self.rest_url = rest_url
self.ws_url = ws_url
self.ws_client: ClientConnection | None = None
self._rlock = threading.RLock()
self._timeout = 5 # Default timeout for requests
self._status: TimePixFlyStatus = TimePixFlyStatus.UNDEFINED
self._ws_update_thread: threading.Thread | None = None
self._shutdown_event = threading.Event()
self._status_callbacks: dict[
str, tuple[StatusBase, list[TimePixFlyStatus], list[TimePixFlyStatus]]
] = {}
self._started: bool = False # Flag to indicate if the client has started sending data
#############################
### Utility Methods ###
#############################
def on_connected(self) -> None:
"""
Called when the client is connected to the TimePix server.
This method can be overridden to perform actions when the client connects.
"""
try:
self.stop_running_collection()
self.connect()
self.wait_for_connection(timeout=5)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error while checking the state of the TimePix server: {content}. "
f"Please check the server address and ensure the server is running."
)
# pylint: disable=raise-missing-from
raise ConnectionError(
f"TimePix Fly client failed to connect to {self.rest_url}. Please check logs for detailed error."
)
def stop_running_collection(self):
"""
Resets the TimePix backend to the configuration state. We check if the backend
is in the CONFIG state, and if it is not, we stop the current collection.
We check in addition that the client has not been started before via start()
REST API call. stop_collect() will reset the flag _started to False.
"""
state = self.state()
if state.state != TimePixFlyStatus.CONFIG or self._started is True:
logger.info(
f"Stopping running collection on TimePix backend, current state: {state.state}, was started: {self._started}"
)
self.stop_collect()
##############################
### WebSocket Methods ###
### Status Update Handling ###
##############################
@property
def status(self) -> TimePixFlyStatus:
"""
Get the current status of the TimePix detector.
Returns:
TimePixFlyStatus: The current status of the TimePix detector.
"""
return self._status
def add_status_callback(
self,
status: StatusBase,
success: list[TimePixFlyStatus],
error: list[TimePixFlyStatus],
run: bool = True,
):
"""
Add a StatusBase callback for the TimePix detector. The status will be updated when the detector status
changes and set to finished when the status matches one of the specified success statuses and to exception
when the status matches one of the specified error statuses.
Per default, the callback will immediately check and run if the status is already in success.
Args:
status (StatusBase): StatusBase object
success (list[StdDaqStatus]): list of statuses that indicate success
error (list[StdDaqStatus]): list of statuses that indicate error
run (bool): If True, the callback will be run immediately if the status is already in success.
If False, the callback will not be run immediately.
"""
if run is True:
try:
if self.status in success:
status.set_finished()
return
if self.status in error:
last_error = self.last_error()
raise TimePixStatusError(
f"Current state {self.status} of TimePixFly Backend is in list of error states: {error}. Last error: {last_error.message}"
)
except Exception as e:
logger.error(f"Error while adding status callback: {e}")
if status.done is False:
status.set_exception(e)
self._status_callbacks[id(status)] = (status, success, error)
def connect(self):
"""Connect to the TimePix WebSocket server."""
if self._ws_update_thread is not None and self._ws_update_thread.is_alive():
return
self._ws_update_thread = threading.Thread(target=self._ws_update_loop, daemon=True)
self._ws_update_thread.start()
# pylint: disable=raise-missing-from
def wait_for_connection(self, timeout: float = 6) -> None:
"""
Wait for the connection to the TimepixFly client connection to be established.
Args:
timeout (float): timeout for the request
"""
logger.info(
f"Attempting to connect to TimePixFly WebSocket at {self.ws_url}, with timeout {timeout} seconds."
)
with self._rlock:
start_time = time.time()
while True:
if self.ws_client is not None and self.ws_client.state == State.OPEN:
return
try:
self.ws_client = connect(f"ws://{self.ws_url}")
break
except ConnectionRefusedError:
if time.time() - start_time > timeout:
content = traceback.format_exc()
logger.error(f"Connection timed out: {content}")
raise TimeoutError(
f"Timeout while waiting for connection to TimePixFly WebSocket server on {self.ws_url}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Failed to connect to TimePixFly WebSocket server on {self.ws_url}: {content}"
)
raise ConnectionError(
f"Failed to connect to TimePixFly WebSocket server on {self.ws_url} with error: {content}"
)
time.sleep(0.5) # Try to reconnect every 0.5 seconds
def _ws_update_loop(self):
"""Websocket update loop, runs in background thread."""
while not self._shutdown_event.is_set():
self._ws_send_and_receive()
def _ws_send_and_receive(self):
"""Receive messages from the TimePixFly WebSocket server."""
if not self.ws_client:
self.wait_for_connection()
try:
try:
recv_msgs = self.ws_client.recv(timeout=0.1)
except TimeoutError:
return
logger.trace(f"Received from timepixfly ws: {recv_msgs}")
if recv_msgs is not None:
self._on_received_ws_message(recv_msgs)
except WebSocketException:
content = traceback.format_exc()
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
def _on_received_ws_message(self, msg: str):
"""
Handle a message received from the StdDAQ.
"""
try:
self._status = TimePixFlyStatus(msg)
logger.info(f"Received TimepixFly status: {self._status.value}")
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Failed to decode websocket message: {content}")
return
self._run_status_callbacks()
def _run_status_callbacks(self):
"""
Update the StatusBase objects based on the current status of the StdDAQ.
If the status matches one of the success or error statuses, the StatusBase object will be set to finished
or exception, respectively and removed from the list of callbacks.
"""
status = self._status
logger.warning(f"Running status callbacks for status: {status.value}")
callback_ids = list(self._status_callbacks.keys())
for cb_id in callback_ids:
dev_status, success, error = self._status_callbacks[cb_id]
with dev_status._lock:
if dev_status.done:
self._status_callbacks.pop(cb_id)
continue
if status in success:
dev_status.set_finished()
logger.debug(f"Status callback finished in succes: {status.value}")
self._status_callbacks.pop(cb_id)
elif status in error:
try:
last_error = self.last_error()
raise TimePixStatusError(
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
)
except Exception as e:
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
dev_status.set_exception(e)
self._status_callbacks.pop(cb_id)
# Reset the _started flag if the status is in CONFIG.
if status == TimePixFlyStatus.CONFIG:
self._started = False # Should this be made thread-safe?
def shutdown(self):
"""Shutdown the TimepixFlyClient client."""
self._shutdown_event.set()
if self.ws_client is not None:
self.ws_client.close()
self.ws_client = None
############################
##### REST API Methods #####
############################
def _get(
self, get_cmd: str, get_response_model: Type[TimePixResponse] | None = None
) -> Type[TimePixResponse] | None:
"""
Send a GET request to the TimePix server.
Args:
get_cmd (str): The command to send in the GET request.
get_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
Returns:
Any: The parsed response if a model is provided, else the raw response.
"""
logger.debug(f"Sending GET request to TimePix server: {get_cmd}")
response = requests.get(f"http://{self.rest_url}/{get_cmd}", timeout=self._timeout)
response.raise_for_status() # Raise an error for bad responses
if get_response_model is not None:
try:
return get_response_model(**response.json())
except Exception as e:
logger.info(f"Error parsing response for {get_cmd}: Response: {response.text}")
raise e
else:
return response.text
def _put(
self, put_cmd: str, value: dict[str, Any], put_response_model: Type[TimePixResponse]
) -> Type[TimePixResponse] | None:
"""
Send a PUT request to the TimePix server.
Args:
put_cmd (str): The command to send in the PUT request.
value (dict[str, Any]): The value to send in the PUT request.
put_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
Returns:
Any: The parsed response if a model is provided, else None.
"""
logger.debug(f"Sending PUT request to TimePix server: {put_cmd} with value: {value}")
response = requests.put(
f"http://{self.rest_url}/{put_cmd}", json=value, timeout=self._timeout
)
response.raise_for_status()
if put_response_model is not None:
return put_response_model(**response.json())
def start(self) -> None:
"""
Start the TimePix detector by sending a GET request to the start endpoint.
This method is a wrapper around the REST API call to start the detector.
"""
logger.debug(f"Start called from client")
self._get(get_cmd="?start=true")
self._started = True
def stop(self) -> None:
"""
Stop the TimePix detector by sending a GET request to the stop endpoint.
This method is a wrapper around the REST API call to stop the detector.
"""
self._get(get_cmd="?stop=true")
self._started = False
def stop_collect(self) -> None:
"""
Stop the data collection of the TimePix detector by sending a GET request to the stop-collect endpoint.
This method is a wrapper around the REST API call to stop data collection.
"""
self._get(get_cmd="?stop_collect=true")
self._started = False
def kill(self) -> None:
"""
Kill the TimePix detector by sending a GET request to the kill endpoint.
This method is a wrapper around the REST API call to kill the detector.
"""
self._get(get_cmd="?kill=true")
self._started = False
def last_error(self) -> LastError:
"""
Get the last error message from the TimePix detector by sending a GET request
to the last-error endpoint.
Returns:
LastError: The last error message from the detector.
"""
return self._get(get_cmd="last-error", get_response_model=LastError)
def state(self) -> ProgramState:
"""
Get the program state of the TimePix detector by sending a GET request
to the state endpoint.
Returns:
ProgramState: The current state of the TimePix detector.
"""
return self._get(get_cmd="state", get_response_model=ProgramState)
def version(self) -> Version:
"""
Get the version of the TimePix detector by sending a GET request
to the version endpoint.
Returns:
Version: The version information of the TimePix detector.
"""
return self._get(get_cmd="version", get_response_model=Version)
def get_pixel_map(self) -> PixelMap:
"""
Get the pixel map of the TimePix detector by sending a GET request
to the pixel-map endpoint.
Returns:
PixelMap: The pixel map of the TimePix detector.
"""
return self._get(get_cmd="pixel-map", get_response_model=PixelMap)
def set_pixel_map(self, pixel_map: PixelMap | dict) -> None:
"""
Set the pixel map of the TimePix detector by sending a PUT request
to the pixel-map endpoint.
Args:
pixel_map (PixelMap | dict): The pixel map to set. Can be a PixelMap instance or a dictionary.
"""
if not isinstance(pixel_map, PixelMap):
if isinstance(pixel_map, dict):
pixel_map = PixelMap(**pixel_map)
else:
raise ValueError(
f"Value must be an instance of PixelMap. Received {type(pixel_map)}, {pixel_map}."
)
self._put(put_cmd="pixel-map", value=pixel_map.model_dump(), put_response_model=None)
def set_pixel_map_from_file(self, pixel_map_file: PixelMapFromFile | dict | str) -> None:
"""
Set the pixel map of the TimePix detector from a file by sending a PUT request
to the pixel-map-from-file endpoint.
Args:
pixel_map_file (PixelMapFromFile | dict): The pixel map from a file to set.
Can be a PixelMapFromFile instance or a dictionary.
"""
if not isinstance(pixel_map_file, PixelMapFromFile):
if isinstance(pixel_map_file, dict):
pixel_map_file = PixelMapFromFile(**pixel_map_file)
elif isinstance(pixel_map_file, str):
pixel_map_file = PixelMapFromFile(filename=pixel_map_file)
else:
raise ValueError(
f"Value must be an instance of PixelMapFromFile. Received {type(pixel_map_file)}, {pixel_map_file}."
)
self._put(
put_cmd="pixel-map-from-file",
value=pixel_map_file.model_dump(),
put_response_model=PixelMapFromFile,
)
def get_other_config(self) -> OtherConfigModel:
"""
Get the other configuration parameters of the TimePix detector by sending a GET request
to the other-config endpoint.
Returns:
OtherConfigModel: The other configuration parameters of the TimePix detector.
"""
return self._get(get_cmd="other-config", get_response_model=OtherConfigModel)
def set_other_config(self, other_config: OtherConfigModel | dict) -> None:
"""
Set the other configuration parameters of the TimePix detector by sending a PUT request
to the other-config endpoint.
Args:
other_config (OtherConfigModel | dict): The other configuration parameters to set.
Can be an OtherConfigModel instance or a dictionary.
"""
if not isinstance(other_config, OtherConfigModel):
if isinstance(other_config, dict):
other_config = OtherConfigModel(**other_config)
else:
raise ValueError(
f"Value must be an instance of OtherConfigModel. Received {type(other_config)}, {other_config}."
)
self._put(put_cmd="other-config", value=other_config.model_dump(), put_response_model=None)
def get_net_addresses(self) -> NetAddresses:
"""
Get the network addresses of the TimePix detector by sending a GET request
to the net-addresses endpoint.
Returns:
dict[str, str]: A dictionary containing the network addresses of the TimePix detector.
"""
return self._get(get_cmd="net-addresses", get_response_model=NetAddresses)

View File

@@ -0,0 +1,182 @@
"""
This module defines Pydantic models for the TimePix detector API responses. These
models are used to validate and structure the data returned by the TimePix REST API.
Any change will be reflected immediately, which will simplify debugging if the API changes.
"""
from typing import Literal
from pydantic import BaseModel, Field
# pylint: disable=line-too-long
class TimePixResponse(BaseModel):
"""Base model for TimePix responses."""
model_config = {"validate_assignment": True}
class OtherConfigModel(TimePixResponse):
"""
OtherConfigModel is a Pydantic model that represents the configuration
for the TimePix detector.
Attributes:
- type: str - The type of the configuration, default is "OtherConfig".
- output_uri: str - The URI for the data stream. The backend will send data to this address.
It is the responsibility of the device must start a TCP server and listen on this socket to
receive the data.
- save_interval: int - The interval at which histograms are written.
- TRoiStart: int - The start time for the Time ROI (Region of Interest).
- TRoiStep: int - The step size for the Time ROI.
- TRoiN: int - The number of points in the Time ROI.
"""
type: str = "OtherConfig"
output_uri: str
save_interval: int = Field(
default=131000, description="Interval in seconds to write histograms"
)
TRoiStart: int = Field(
default=0, description="Start time for the Time ROI (Region of Interest)"
)
TRoiStep: int = Field(default=1, description="Step size for the Time ROI")
TRoiN: int = Field(default=5000, description="Number of points in the Time ROI")
class LastError(TimePixResponse):
"""
TimePixLastError is a Pydantic model that represents the last error message
from the TimePix detector api for the REST API call '/last-error'.
Attributes:
- type: str - The type of the response, default is "LastError".
- message: str - The last error message from the detector.
"""
type: str = "LastError"
message: str
class ProgramState(TimePixResponse):
"""
ProgramState is a Pydantic model that represents the state of the TimePix program.
Attributes:
- type: str - The type of the response, default is "ProgramState".
- state: str - The current state of the progra, can be "init", "config", "setup", "collect", "shutdown"
"""
type: str = "ProgramState"
state: Literal["init", "config", "setup", "await_connection", "collect", "shutdown"]
class Version(TimePixResponse):
"""
Version is a Pydantic model that represents the version information of the TimePix detector.
Attributes:
- type: str - The type of the response, default is "Version".
- version: str - The version string of the TimePix detector.
"""
type: str = "Version"
version: str
class PixelMapFromFile(TimePixResponse):
"""
PixelMapFromFile is a Pydantic model that represents a pixel map loaded from a file.
Attributes:
- type: str - The type of the response, default is "PixelMapFromFile".
- file: str - The path to the file containing the pixel map.
"""
type: str = "PixelMapFromFile"
file: str
class PixelMap(TimePixResponse):
"""
PixelMap is a Pydantic model that represents the pixel mapping for the TimePix detector.
Attributes:
- type: str - The type of the response, default is "PixelMap".
- chips: list - A list of chips, each containing a list of pixel mappings.
"""
type: str = "PixelMap"
chips: list[list[dict[Literal["i", "p", "f"], int | float | list[int | float]]]]
# For efficiency, we do not arse the responses into Pydantic models, but use the dict from
# json directly. Nevertheless, we define the models here to have a common interface
# and to be able to use them in the future if needed.
class TimepixStartFrame(TimePixResponse):
"""TimepixStartFrame is a Pydantic model that represents the start frame of a TimePix acquisition."""
type: str = "StartFrame"
Mode: Literal["TOA"]
TRoiStart: int
TRoiStep: int
TRoiN: int
NumEnergyPoints: int
save_interval: int
class TimepixXESFrame(TimePixResponse):
"""TimepixXESFrame is a Pydantic model that represents a data frame from the TimePix detector."""
type: str = "XesData"
period: int
TDSpectra: list[float]
totalEvents: int
beforeROI: int
afterROI: int
class TimepixEndFrame(TimePixResponse):
"""
TimepixEndFrame is a Pydantic model that represents the end frame of a TimePix acquisition.
Attributes:
- type: str - The type of the response, default is "EndFrame".
- error: str - If an error occured during acuisition, this field contains the error message. Empty string otherwise.
- periods: int - The last period count minus 3 (3 being the period predictor delay).
"""
type: str = "EndFrame"
error: str
periods: int # Last period count minus 3 (3 being the period predictor delay)
# Habe ein GET /net-addresses call implementiert
# // /net-addresses GET applicable net addresses
# // GET return:
# // - status 200
# // - data
# // {
# // "type":"NetAddresses",
# // "control":"127.0.0.1:8452", // own rest interface
# // "address":"127.0.0.1:8451", // own address, the destination of ASI server raw data
# // "server":"127.0.0.1:8080" // ASI server rest interface address
# // }
class NetAddresses(TimePixResponse):
"""
NetAddresses is a Pydantic model that represents the network addresses used by the TimePix detector.
Attributes:
- type: str - The type of the response, default is "NetAddresses".
- control: str - The address of the REST interface for control commands.
- address: str - The address where the ASI server sends raw data.
- server: str - The address of the ASI server's REST interface.
"""
type: str = "NetAddresses"
control: str # timepix_rest_host
address: str # data_socket_for_asi
server: str # asi_rest_host

View File

@@ -0,0 +1,382 @@
"""Temporary utility module for Status Object implementations."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ophyd import Device, DeviceStatus, StatusBase
class AndStatusWithList(DeviceStatus):
"""
Custom implementation of the AndStatus that combines the
option to add multiple statuses as a list, and in addition
allows for adding the Device as an object to access its
methods.
Args"""
def __init__(
self,
device: Device,
status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus],
**kwargs,
):
self.all_statuses = status_list if isinstance(status_list, list) else [status_list]
super().__init__(device=device, **kwargs)
self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
# TODO improve __repr__ and __str__
def __repr__(self):
return "<AndStatusWithList({self.all_statuses!r})>".format(self=self)
def __str__(self):
return "<AndStatusWithList(done={self.done}, success={self.success})>".format(self=self)
def __contains__(self, status: StatusBase | DeviceStatus) -> bool:
for child in self.all_statuses:
if child == status:
return True
if isinstance(child, AndStatusWithList):
if status in child:
return True
return False
# # TODO Check if this actually works....
# def set_exception(self, exc):
# # Propagate the exception to all sub-statuses that are not done yet.
#
# with self._lock:
# if self._externally_initiated_completion:
# return
# if self.done: # Return if status is already done.. It must be resolved already
# return
# super().set_exception(exc)
# for st in self.all_statuses:
# with st._lock:
# if not st.done:
# st.set_exception(exc)
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
if self._exception is None:
exc = TimeoutError(
f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
self.log.exception("%r encountered error during _settled()", self)
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
class AndStatus(StatusBase):
"""Custom AndStatus for TimePix detector."""
def __init__(
self,
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
name: str | Device | None = None,
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
**kwargs,
):
self.left = left if isinstance(left, list) else [left]
if right is not None:
self.right = right if isinstance(right, list) else [right]
else:
self.right = []
self.all_statuses = self.left + self.right
if name is None:
name = "unname_status"
elif isinstance(name, Device):
name = name.name
else:
name = name
self.name = name
super().__init__(**kwargs)
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)
def __str__(self):
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
def __contains__(self, status: StatusBase) -> bool:
for child in [self.left, self.right]:
if child == status:
return True
if isinstance(child, AndStatus):
if status in child:
return True
return False
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
# We have timed out. It's possible that set_finished() has already
# been called but we got here before the settle_time timer expired.
# And it's possible that in this space be between the above
# statement timing out grabbing the lock just below,
# set_exception(exc) has been called. Both of these possibilties
# are accounted for.
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
# Set the exception and mark the Status as done, unless
# set_exception(exc) was called externally before we grabbed
# the lock.
if self._exception is None:
exc = TimeoutError(
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
# No alternative but to log this. We can't supersede set_exception,
# and we have to continue and run the callbacks.
self.log.exception("%r encountered error during _settled()", self)
# Now we know whether or not we have succeed or failed, either by
# timeout above or by set_exception(exc), so we can set the Event that
# will mark this Status as done.
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
# The callbacks have access to self, from which they can distinguish
# success or failure.
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
# from __future__ import annotations
# from collections import defaultdict
# from typing import Dict, List, Tuple
# import numpy as np
# ROI = List[Tuple[float, float]]
# def order_roi_corners_simple(roi: ROI) -> np.ndarray:
# """Order ROI corners as [top-left, top-right, bottom-right, bottom-left]."""
# pts = np.array(roi, dtype=float)
# cx, cy = pts.mean(axis=0)
# angles = np.arctan2(pts[:, 1] - cy, pts[:, 0] - cx)
# idx = np.argsort(angles)
# ordered = pts[idx]
# # Ensure clockwise order
# if np.cross(ordered[1] - ordered[0], ordered[2] - ordered[0]) < 0:
# ordered = ordered[::-1]
# return ordered[:4]
# def compute_affine_transform(
# src: np.ndarray, dst: np.ndarray, preserve_scale: bool = True
# ) -> np.ndarray:
# """Compute affine transform mapping src -> dst. Optionally preserve pixel scale."""
# if preserve_scale:
# # Solve for rotation+translation only
# A = np.array(
# [
# [src[0, 0], -src[0, 1], 1, 0],
# [src[0, 1], src[0, 0], 0, 1],
# [src[1, 0], -src[1, 1], 1, 0],
# [src[1, 1], src[1, 0], 0, 1],
# ]
# )
# b = dst[:2].ravel()
# x, residuals, _, _ = np.linalg.lstsq(A, b, rcond=None)
# a, b_, tx, ty = x
# return np.array([[a, -b_, tx], [b_, a, ty]])
# else:
# # Full affine transform
# src_h = np.hstack([src, np.ones((4, 1))])
# dst_h = dst
# M, _, _, _ = np.linalg.lstsq(src_h, dst_h, rcond=None)
# return M.T
# def apply_affine_transform(coords: np.ndarray, affine: np.ndarray) -> np.ndarray:
# """Apply affine transform to coordinates."""
# coords_h = np.hstack([coords, np.ones((coords.shape[0], 1))])
# transformed = coords_h @ affine.T
# return transformed[:, :2]
# def roi_pixel_hits(
# image_shape: Tuple[int, int], roi: ROI, start_idx: int = 0, min_fraction_diff: float = 0.0
# ) -> List[Dict]:
# """
# For each ROI, return list of hits as dicts {'i': (x,y), 'p': row_idx, 'f': fraction}.
# Supports rotated rectangles using bilinear fraction splitting.
# """
# hits_dict: Dict[Tuple[int, int], Dict[str, list]] = defaultdict(lambda: {"p": [], "f": []})
# corners = order_roi_corners_simple(roi)
# height = int(np.linalg.norm(corners[0] - corners[3])) + 1
# width = int(np.linalg.norm(corners[0] - corners[1])) + 1
# dst = np.array([[0, 0], [width - 1, 0], [width - 1, height - 1], [0, height - 1]], dtype=float)
# affine_mat = compute_affine_transform(corners, dst) # 2x3
# # Bounding box in image
# min_x = max(int(np.floor(corners[:, 0].min())), 0)
# max_x = min(int(np.ceil(corners[:, 0].max())), image_shape[1] - 1)
# min_y = max(int(np.floor(corners[:, 1].min())), 0)
# max_y = min(int(np.ceil(corners[:, 1].max())), image_shape[0] - 1)
# yy, xx = np.meshgrid(np.arange(min_y, max_y + 1), np.arange(min_x, max_x + 1), indexing="ij")
# coords = np.stack([xx.ravel(), yy.ravel()], axis=1) # N x 2
# local_coords = apply_affine_transform(coords, affine_mat)
# x_local = local_coords[:, 0]
# y_local = local_coords[:, 1]
# # Keep pixels inside ROI rectangle
# mask = (x_local >= 0) & (x_local <= width - 1) & (y_local >= 0) & (y_local <= height - 1)
# coords_in = coords[mask]
# x_in = x_local[mask]
# y_in = y_local[mask]
# # Bilinear fractions
# x0 = np.floor(x_in).astype(int)
# y0 = np.floor(y_in).astype(int)
# dx = x_in - x0
# dy = y_in - y0
# for coord, x0i, y0i, dxv, dyv in zip(coords_in, x0, y0, dx, dy):
# row_base = start_idx
# # Contributions to 4 neighboring "rows"
# contributions = [
# (row_base + y0i, (1 - dxv) * (1 - dyv)),
# (row_base + y0i, dxv * (1 - dyv)),
# (row_base + y0i + 1, (1 - dxv) * dyv),
# (row_base + y0i + 1, dxv * dyv),
# ]
# # Filter negligible contributions
# contributions = [(p, f) for p, f in contributions if f >= min_fraction_diff]
# # Normalize fractions to sum 1
# if contributions:
# total_f = sum(f for _, f in contributions)
# contributions = [(p, f / total_f) for p, f in contributions]
# for p, f in contributions:
# key = (int(coord[0]), int(coord[1]))
# hits_dict[key]["p"].append(int(p))
# hits_dict[key]["f"].append(float(f))
# hits_roi = [{"i": key, "p": value["p"], "f": value["f"]} for key, value in hits_dict.items()]
# return hits_roi
# if __name__ == "__main__":
# image_shape = (512, 512)
# rois = [
# [(25, 25), (50, 50), (50, 25), (75, 50)], # rotated 45 degrees rectangle
# [(300, 300), (400, 300), (400, 400), (300, 400)], # upright rectangle
# ]
# hits_0 = roi_pixel_hits(image_shape, rois[0], min_fraction_diff=0.1)
# hits_1 = roi_pixel_hits(image_shape, rois[1], start_idx=10, min_fraction_diff=0.2)
# print(hits_0[:5])
# print(hits_1[:5])

View File

@@ -0,0 +1,15 @@
"""This module tests the Timepix Fly backend functionality."""
from __future__ import annotations
import pytest
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
@pytest.fixture(scope="function")
def timepix_fly_backend():
"""Fixture for creating a Timepix Fly backend instance."""
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
yield backend
backend.on_destroy()

View File

@@ -0,0 +1,276 @@
"""Module to test the Timepix Fly client functionality."""
from __future__ import annotations
from unittest import mock
import pytest
from ophyd import StatusBase
from websockets import State
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimepixFlyClient,
TimePixFlyStatus,
TimePixStatusError,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import ProgramState
@pytest.fixture(scope="function")
def timepix_fly_client():
"""Fixture for creating a Timepix Fly client instance."""
client = TimepixFlyClient(rest_url="http://localhost:8000", ws_url="ws://localhost:8000/ws")
try:
yield client
finally:
client.shutdown()
@pytest.mark.parametrize(
"return_state",
[
ProgramState(state=TimePixFlyStatus.CONFIG),
ProgramState(state=TimePixFlyStatus.COLLECT),
ProgramState(state=TimePixFlyStatus.SETUP),
],
)
def test_timepix_fly_client_stop_running_collection(timepix_fly_client, return_state):
"""Test the on_connected method of the Timepix Fly client."""
with (
mock.patch.object(timepix_fly_client, "stop_collect") as mock_stop_collect,
mock.patch.object(timepix_fly_client, "state", return_value=return_state),
):
timepix_fly_client.stop_running_collection()
if return_state.state == TimePixFlyStatus.CONFIG:
assert mock_stop_collect.call_count == 0, "Stop collect should be called once."
timepix_fly_client._started = True
timepix_fly_client.stop_running_collection()
assert mock_stop_collect.call_count == 1, "Stop collect should not be called again."
else:
assert mock_stop_collect.call_count == 1, "Stop collect should be called once."
timepix_fly_client._started = True
timepix_fly_client.stop_running_collection()
assert mock_stop_collect.call_count == 2, "Stop collect should be called in CONFIG."
def test_timepix_fly_client_on_connected(timepix_fly_client):
"""
Test timepix fly client connect method.
This simply ensures that all methods are called. They are tested separately.
"""
with (
mock.patch.object(timepix_fly_client, "stop_running_collection") as mock_stop_collection,
mock.patch.object(timepix_fly_client, "connect") as mock_connect,
mock.patch.object(timepix_fly_client, "wait_for_connection") as mock_wait_for_connection,
):
timepix_fly_client.on_connected()
mock_stop_collection.assert_called_once()
mock_connect.assert_called_once()
mock_wait_for_connection.assert_called_once()
def test_timepix_fly_client_connect(timepix_fly_client):
"""This tests the connect method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
# Patch Thread so we don't actually start the background loop
with mock.patch(f"{module_path}.threading.Thread") as mock_thread_cls:
mock_thread = mock.Mock()
mock_thread_cls.return_value = mock_thread
timepix_fly_client.connect()
# Thread should be created with the update loop as target and daemon True
mock_thread_cls.assert_called_once()
# start() must be called on the created thread
mock_thread.start.assert_called_once()
def test_timepix_fly_client_wait_for_connection(timepix_fly_client):
"""This tests the wait_for_connection method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
# Case 1: ws_client already present and OPEN
mock_client = mock.Mock()
type(mock_client).state = mock.PropertyMock(return_value=State.OPEN)
timepix_fly_client.ws_client = mock_client
# Should return immediately
timepix_fly_client.wait_for_connection(timeout=0.1)
# Case 2: connect() establishes the connection
timepix_fly_client.ws_client = None
mock_client2 = mock.Mock()
type(mock_client2).state = mock.PropertyMock(return_value=State.OPEN)
with mock.patch(f"{module_path}.connect", return_value=mock_client2) as mock_connect:
timepix_fly_client.wait_for_connection(timeout=0.1)
mock_connect.assert_called_once()
def test_timepix_fly_client_ws_send_and_received(timepix_fly_client):
"""This tests the _ws_send_and_receive method of timepix fly client."""
# Prepare a mock ws_client where recv returns a message
mock_ws = mock.Mock()
mock_ws.recv.return_value = "init"
timepix_fly_client.ws_client = mock_ws
# Patch _on_received_ws_message to ensure it's invoked
with mock.patch.object(timepix_fly_client, "_on_received_ws_message") as mock_on_msg:
timepix_fly_client._ws_send_and_receive()
mock_on_msg.assert_called_once_with("init")
# Now simulate a TimeoutError from recv: should simply return and not call _on_received_ws_message
mock_ws2 = mock.Mock()
mock_ws2.recv.side_effect = TimeoutError
timepix_fly_client.ws_client = mock_ws2
with mock.patch.object(timepix_fly_client, "_on_received_ws_message") as mock_on_msg2:
timepix_fly_client._ws_send_and_receive()
mock_on_msg2.assert_not_called()
def test_timepix_fly_client_on_message_received(timepix_fly_client):
"""This tests the _on_received_ws_message method of timepix fly client."""
with mock.patch.object(timepix_fly_client, "_run_status_callbacks") as mock_run_callbacks:
timepix_fly_client._on_received_ws_message("init")
assert timepix_fly_client._status == TimePixFlyStatus.INIT
mock_run_callbacks.assert_called_once()
# invalid message should not change status or call callbacks
prev_status = timepix_fly_client._status
with mock.patch.object(timepix_fly_client, "_run_status_callbacks") as mock_run_callbacks2:
timepix_fly_client._on_received_ws_message("invalid_status_string")
# status stays as previous value and callbacks not called
assert timepix_fly_client._status == prev_status
mock_run_callbacks2.assert_not_called()
def test_timepix_fly_client_on_status_callbacks(timepix_fly_client):
"""This tests the _run_status_callbacks method of timepix fly client."""
# Immediate run when current status already in success
timepix_fly_client._status = TimePixFlyStatus.INIT
status = StatusBase()
timepix_fly_client.add_status_callback(
status=status, success=[TimePixFlyStatus.INIT], error=[TimePixFlyStatus.EXCEPT], run=True
)
assert status.done is True and status.success is True
# Add callback (do not run immediately) and then trigger via _run_status_callbacks
status2 = StatusBase()
timepix_fly_client.add_status_callback(
status=status2,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT],
run=False,
)
# Set status to CONFIG and mark started True to check reset
timepix_fly_client._status = TimePixFlyStatus.CONFIG
timepix_fly_client._started = True
timepix_fly_client._run_status_callbacks()
assert status2.done is True and status2.success is True
assert timepix_fly_client._started is False
# Error path: add with run True when status is EXCEPT
timepix_fly_client._status = TimePixFlyStatus.EXCEPT
status3 = StatusBase()
with mock.patch.object(timepix_fly_client, "last_error") as mock_last_error:
mock_err = mock.Mock()
mock_err.message = "boom"
mock_last_error.return_value = mock_err
timepix_fly_client.add_status_callback(
status=status3,
success=[TimePixFlyStatus.INIT],
error=[TimePixFlyStatus.EXCEPT],
run=True,
)
assert status3.done is True and status3.success is False
def test_timepix_fly_client_shutdown(timepix_fly_client):
"""This tests the shutdown method of timepix fly client."""
mock_ws = mock.Mock()
timepix_fly_client.ws_client = mock_ws
timepix_fly_client.shutdown()
mock_ws.close.assert_called_once()
assert timepix_fly_client.ws_client is None
assert timepix_fly_client._shutdown_event.is_set()
def test_timepix_fly_client_start(timepix_fly_client):
"""This tests the start method of timepix fly client."""
# The client._get uses requests.get with f"http://{self.rest_url}/{get_cmd}".
# We mock requests.get and verify it is called with the expected URL and timeout,
# and that the _started flag is set.
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.get") as mock_get:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
mock_resp.text = ""
mock_get.return_value = mock_resp
timepix_fly_client.start()
expected_url = f"http://{timepix_fly_client.rest_url}/?start=true"
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
assert timepix_fly_client._started is True
def test_timepix_fly_client_stop_collect(timepix_fly_client):
"""This tests the stop_collect method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.get") as mock_get:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
mock_resp.text = ""
mock_get.return_value = mock_resp
# ensure started is True and then call stop_collect
timepix_fly_client._started = True
timepix_fly_client.stop_collect()
expected_url = f"http://{timepix_fly_client.rest_url}/?stop_collect=true"
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
assert timepix_fly_client._started is False
def test_timepix_fly_client_state(timepix_fly_client):
"""This tests the state method of timepix fly client."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.get") as mock_get:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
# Return a JSON payload compatible with ProgramState
mock_resp.json.return_value = {"type": "ProgramState", "state": "init"}
mock_get.return_value = mock_resp
program_state = timepix_fly_client.state()
expected_url = f"http://{timepix_fly_client.rest_url}/state"
mock_get.assert_called_once_with(expected_url, timeout=timepix_fly_client._timeout)
# ProgramState.model defines 'state' as a string literal; ensure we parsed it
assert hasattr(program_state, "state")
assert program_state.state == "init"
def test_timepix_fly_client_set_pixel_map(timepix_fly_client):
"""This tests the set_pixel_map/_put path by mocking requests.put and checking payload."""
module_path = "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client"
with mock.patch(f"{module_path}.requests.put") as mock_put:
mock_resp = mock.Mock()
mock_resp.raise_for_status = mock.Mock()
mock_put.return_value = mock_resp
# Minimal valid PixelMap dict (type is optional; model supplies default)
pixel_map = {"chips": [[{"i": 0, "p": 1, "f": 2}]]}
timepix_fly_client.set_pixel_map(pixel_map)
expected_url = f"http://{timepix_fly_client.rest_url}/pixel-map"
# Verify requests.put called with expected url, json and timeout
mock_put.assert_called_once()
_, kwargs = mock_put.call_args
assert kwargs.get("timeout") == timepix_fly_client._timeout
assert kwargs.get("json") is not None
assert kwargs.get("json").get("chips") == pixel_map["chips"]

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python
"""Client using the asyncio API."""
import asyncio
from websockets.asyncio.client import connect
async def hello():
async with connect("ws://localhost:8452/ws") as websocket:
await websocket.send("Hello world!")
while True:
message = await websocket.recv()
print(message)
if __name__ == "__main__":
asyncio.run(hello())