Refactor Timepix integration to use the REdis publisher for TimePixFly #23

Merged
appel_c merged 10 commits from timepix-migration-redis-writer into main 2026-06-01 13:51:35 +02:00
6 changed files with 410 additions and 187 deletions
+141
View File
@@ -0,0 +1,141 @@
# Timepix detector integration
This module integrates the ASI Serval `Timepix` detector into the SuperXAS BEC device layer. It combines EPICS/areaDetector controls for camera and image handling with the TimePixFly backend (`TimepixFlyBackend`) for raw-data processing and XES histogram generation.
## Main classes
The main integration class is `Timepix` in `timepix.py`. It combines the EPICS/areaDetector camera interface with BEC lifecycle hooks and BEC-facing signals.
The TimePixFly support lives in `timepix_fly_client/`:
- `timepix_fly_backend.py` defines `TimepixFlyBackend`, the lifecycle adapter used by `Timepix`.
- `timepix_fly_client.py` contains the REST/WebSocket client.
- `timepix_fly_interface.py` contains the Pydantic models for TimePixFly configuration and raw-data messages.
### Device configuration
This is an example device configuration for the `Timepix` device.
```yaml
timepix:
readoutPriority: async
description: ASI Serval Timepix Detector
deviceClass: superxas_bec.devices.timepix.timepix.Timepix
deviceConfig:
prefix: "X10DA-ES-TPX1:"
backend_rest_url: "P6-0008.psi.ch:8452"
hostname: "x10da-bec-001.psi.ch"
enable_xes: false
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: true
```
The `deviceConfig` parameter `enable_xes` controls whether TimePixFly XES processing is active when the device is loaded. It can also be changed at runtime with `set_enable_xes(True)` or `set_enable_xes(False)`.
If HDF5 file writing is not possible, for example because the IOC cannot access the target mount, it can be disabled from the CLI with `dev.timepix.hdf.enable.put(0)`.
Currently, the `Timepix` device has the following BEC signals:
- `xes_data`: full XES data with time ROI bins and energy points.
- `xes_spectra`: XES data integrated over energy points.
- `xes_energy_1` and `xes_energy_2`: two grouped energy windows used when the
pixel map contains eight energy points. They represent the sum of energy points `0:4` and `4:8`, respectively.
- `tds_period`, `total_periods`, and `total_events`: timing and event counters
reported by TimePixFly.
- `preview`: 2D camera preview from the image plugin.
- `static_spectra`: 1D spectrum derived from received image arrays.
Users can set the pixel map through the `set_pixel_map()` method or the `set_pixel_map_from_json_file()` method, which accepts a file path to a JSON file matching the `PixelMap` schema defined in `timepix_fly_interface.py`. A default pixel map is created one energy point per detector module. The `set_enable_xes(True)` enables users to de/activate the XES data easily from the CLI.
## Scan lifecycle
### Initialization and connection
`on_init()` builds the default pixel map. No EPICS signals are connected yet, so detector PV defaults are not written here. In
`wait_for_connection()` we wait for all EPICS signals to connect and the connect the TimePixFly backend, and setup the raw-data callback in Redis.
Once connected, we set default values on signals in `on_connected()`:
- enables or disables TDC/raw-stream settings according to `enable_xes`;
- sets internal trigger mode, `HDMI1_1` trigger source, and timed exposure mode;
- resets the camera array counter;
- sets image mode to multiple and prepares the HDF5 plugin for stream writing;
- registers `msg_buffer_callback()` with the TimePixFly backend; This callback parses the XES data from Redis and updates the BEC-facing XES signals.
- starts preview polling and subscribes to image updates for `static_spectra`.
### Stage
`on_stage()` translates the scan parameters into detector settings:
- `exp_time` becomes the camera acquire period.
- `frames_per_trigger` becomes the camera burst size.
- `num_points * frames_per_trigger` becomes the expected HDF5 image count.
The camera acquire time is set to `exp_time - MIN_DETECTOR_READOUT_TIME`. Staging fails if the exposure is not longer than the detector readout time.
Please note that if `burst_at_each_point` is > 1, we run multiple full acquisitions for each trigger. This is needed in the current TimePixFly and ASI Serval Timepix integration for some reason. If this changes, this can be adapted.
If XES is enabled, `on_stage()` also configures TimePixFly:
- `TRoiStep` and `TRoiN` come from `troistep` and `troin`.
- `output_uri` is a Redis URI containing the BEC Redis host, port, raw-data topic, and scan id. Scan ID is stored in the metadata for each message, but currently not further used. One could safeguard against misconfigurations by checking the scan_id in the msg callback.
- `save_interval` is set for approximately 5 Hz XES updates.
### Pre-scan
During `on_pre_scan()`, we make sure that the camera is idle before starting the acquisition.
### Trigger
For every trigger, we software trigger the acquisition of the `Timepix` detector with a full cycle:
```text
DONE -> ACQUIRING -> DONE
```
Otherwise, the currently assumptions between TimePixFly and ASI Serval Timepix integration would be violated.
#### State management during a single trigger call
The `TimePixFly` backend undergoes a full lifecycle for each trigger. Therefore, we must properly manage and keep track of its transitions. The initial state is `config`, in which the backend must be at the beginning of each trigger. We then move to `await_connection` while the camera is still IDLE. Then, the camera needs to be started with `cam.acquire.put(1)`. The camera will connect to the `TimePixFly` backend on the raw-data socket, which initiates a state transition to `collect`. Once the acquisition is done, and the camera closes the socket, the backend transitions back to `config` after sending out the last `EndFrame` message. During `collect`, the backend sends out XES data messages to Redis at the specified `save_interval`. The backend status resolves when it transitions back to `config` after the acquisition is done. In the meantime, we track the transition of the detector control interface to cycle through
```text
DONE -> ACQUIRING -> DONE
```
This describes a full trigger cycle. Therefore, efficient data collection should rather use `frames_per_trigger > 1` instead of multiple software trigger cycle via `burst_at_each_point > 1`. This reduces the overhead due to the logic described above.
Please note, if at any point the state of the `TimePixFly` backend transitions to `error`, we will automatically raise an exception with the error message and stop the scan. The error will be forwarded to BEC.
### Complete
As this is called once the full scan is done, `on_complete()` waits for the camera to become `DONE`. If HDF5 is enabled, it also waits for capture to stop, file writing to finish, the write status to remain successful, and the expected image count to be captured. If XES is enabled, it waits for TimePixFly to be back in `config`. We also check that there is no error state in the writer, which can for example happen if there are file permission issues for the writer path.
### Stop and destroy
`on_stop()` stops camera acquisition, stops HDF5 capture, and asks the backend to stop any running collection when XES is enabled.
`on_destroy()` stops acquisition and capture, stops preview polling, and destroys backend resources.
## XES data callbacks
The idea behind the integration is that the `TimePixFlyBackend` class allows high-level classes to register callbacks to received the `StartFrame`, `XesData`, and `EndFrame` messages from `TimePixFly`. They can then parse the data from these messages as they see fit. The mechanism works through registering a callback function to the backend, which gets called with all the relevant data once an `EndFrame` message is received. This allows the backend to buffer all the incoming messages for one acquisition and then call the callback once all data is available.
In the current implementation, the `msg_buffer_callback()` builds the BEC-facing XES arrays:
1. It reads `NumEnergyPoints`, `TRoiN`, `save_interval`, and the collected `XesData` frames.
2. It sums `TDSpectra` values into a `(TRoiN, NumEnergyPoints)` array.
3. It publishes `tds_period`, `total_periods`, `total_events`, `xes_data`, and `xes_spectra`.
4. If there are eight energy points, it publishes two grouped spectra: energy points `0:4` and `4:8`.
If the signals change, they have to be adapted in this callback method. We currently have for example that if there are eight energy points, we publish two grouped spectra. The grouping here is the left and right half of the detector. This was written for the default pixel map, but it will obviously not true for an arbitrary pixel map with eight energy points.
### Image data callbacks
Two image-derived signals are updated from the areaDetector image plugin:
- The preview polling thread watches `image.unique_id`, reads `image.array_data`, reshapes it using the plugin array size, and publishes the 2D `preview`.
- The `image.unique_id` subscription computes a 1D `static_spectra` signal by summing the reshaped image along one axis.
## Operational notes
- `MIN_DETECTOR_READOUT_TIME` is `2.1e-3` seconds. Scan exposure times must be larger than this value.
- `enable_xes` controls the TDC/raw-stream settings and whether the TimePixFly backend participates in stage, trigger, complete, and stop.
- Image writing is independent of XES processing and is controlled through the HDF5 plugin enable signal or `set_enable_image_writing()`.
- The current default Redis topic for TimePixFly raw data is `public/timepix_fly_backend/raw_data`.
- The backend REST URL and BEC host name must be reachable from both the BEC server and the TimePixFly service.
+50 -44
View File
@@ -190,6 +190,9 @@ class TimePixControl(ADBase):
DETECTOR_SHAPE = (512, 1024) # Shape of the TimePix detector
TIMEPIX_FLY_BACKEND_DEFAULT_REDIS_TOPIC = "public/timepix_fly_backend/raw_data"
class Timepix(PSIDeviceBase, TimePixControl):
"""
Integration of the Timepix detector for the SuperXAS beamline. The control interface is
@@ -218,8 +221,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
static_spectra = Cpt(AsyncSignal, name="static_spectra", ndim=1, max_size=1000, acquisition_group="monitored", async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]}, doc="Spectra signal of the TimePix detector.")
xes_data_accumulated_1 = Cpt(AsyncSignal, name="xes_accumulated_energy_1", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.")
xes_data_accumulated_2 = Cpt(AsyncSignal, name="xes_accumulated_energy_2", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.")
# xes_data_accumulated_1 = Cpt(AsyncSignal, name="xes_accumulated_energy_1", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.")
# xes_data_accumulated_2 = Cpt(AsyncSignal, name="xes_accumulated_energy_2", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.")
file_event = Cpt(FileEventSignal, name="file_event", doc="File event signal for TimePix detector.")
# fmt: on
@@ -253,15 +256,22 @@ class Timepix(PSIDeviceBase, TimePixControl):
**kwargs: Additional keyword arguments for the base class.
"""
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._redis_raw_data_topic = TIMEPIX_FLY_BACKEND_DEFAULT_REDIS_TOPIC
self.backend = TimepixFlyBackend(
backend_rest_url=backend_rest_url, hostname=hostname, socket_port=socket_port
backend_rest_url=backend_rest_url,
hostname=hostname,
socket_port=socket_port,
redis_topic=self._redis_raw_data_topic,
redis_connector=self.device_manager.connector,
)
self._pixel_map = None
self._troistep = 1
self._troin = 5000
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
@@ -308,7 +318,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
height = self.image.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
logger.info(f"Setting preview data for {self.name} with shape {data.shape}")
logger.debug(f"Setting preview data for {self.name} with shape {data.shape}")
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -397,22 +407,22 @@ class Timepix(PSIDeviceBase, TimePixControl):
data_2 = np.sum(xes_data[:, 4:8], axis=1)
self.xes_energy_1.put(data_1, async_update={"type": "add", "max_shape": [None, troin]})
self.xes_energy_2.put(data_2, async_update={"type": "add", "max_shape": [None, troin]})
if self.accumulated_data_e1 is None:
self.accumulated_data_e1 = data_1
else:
self.accumulated_data_e1 += data_1
if self.accumulated_data_e2 is None:
self.accumulated_data_e2 = data_2
else:
self.accumulated_data_e2 += data_2
self.xes_data_accumulated_1.put(
self.accumulated_data_e1,
async_update={"type": "replace", "max_shape": [None, troin]},
)
self.xes_data_accumulated_2.put(
self.accumulated_data_e2,
async_update={"type": "replace", "max_shape": [None, troin]},
)
# if self.accumulated_data_e1 is None:
# self.accumulated_data_e1 = data_1
# else:
# self.accumulated_data_e1 += data_1
# if self.accumulated_data_e2 is None:
# self.accumulated_data_e2 = data_2
# else:
# self.accumulated_data_e2 += data_2
# self.xes_data_accumulated_1.put(
# self.accumulated_data_e1,
# async_update={"type": "replace", "max_shape": [None, troin]},
# )
# self.xes_data_accumulated_2.put(
# self.accumulated_data_e2,
# async_update={"type": "replace", "max_shape": [None, troin]},
# )
self.xes_spectra.put(
xes_data.sum(axis=1), async_update={"type": "add", "max_shape": [None, troin]}
)
@@ -465,17 +475,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""Set whether XES data acquisition is enabled."""
self._enable_xes = value
self._enable_xes_settings(value)
# #TODO Update device manager config if available
# if self.device_manager is not None:
# dev_obj = self.device_manager.devices.get(self.name, None)
# if dev_obj is not None:
# cfg = dev_obj.get_device_config()
# if "enable_xes" in cfg and cfg["enable_xes"] != value:
# cfg["enable_xes"] = value
# dev_obj.set_device_config({"enable_xes": value})
# logger.info(
# f"Updated 'enable_xes' to {value} in device manager for {self.name}"
# )
@property
def pixel_map(self) -> PixelMap:
@@ -622,6 +621,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_stage(self) -> StatusBase | None:
"""Called while staging the device."""
###
self.accumulated_data_e1 = None
self.accumulated_data_e2 = None
scan_msg: ScanStatusMessage = self.scan_info.msg # type: ignore
@@ -660,10 +660,12 @@ class Timepix(PSIDeviceBase, TimePixControl):
# XES specific staging
if self.enable_xes:
# Prepare TimePixFly
con = self.device_manager.connector
other_config = OtherConfigModel(
TRoiStep=self.troistep,
TRoiN=self.troin,
output_uri=f"tcp:{self.backend.hostname}:{self.backend.socket_port}",
# output_uri=f"tcp:{self.backend.hostname}:{self.backend.socket_port}",
output_uri=f"redis://{con.host}:{con.port}/{self._redis_raw_data_topic}?scan-id={scan_msg.scan_id}",
save_interval=int(131000 / 5) - 5,
) # Save interval in 131kHz units,
logger.debug(f"Current TimePixFly configuration: {other_config}")
@@ -680,10 +682,10 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_unstage(self) -> None:
"""Called while unstaging the device."""
# TODO what should happen for unstage? Make sure that acquisition is not running?
# self.backend.on_unstage()
# self.cam.acquire.put(0)
# status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
if self.cam.acquire_busy.get() == ACQUIRESTATUS.ACQUIRING:
self.cam.acquire.put(0)
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
status_camera.wait(timeout=self._pv_timeout)
def on_pre_scan(self) -> StatusBase:
"""Called right before the scan starts on all devices automatically."""
@@ -708,7 +710,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_camera = TransitionStatus(
self.cam.acquire_busy, [ACQUIRESTATUS.DONE, ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE]
)
img_counter = self.hdf.num_captured.get()
status_backend = None
# First we make sure that the backend reach 'config' state. This needs to happend before each trigger.
if self.enable_xes is True:
@@ -759,7 +760,14 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_writer = None
if self.hdf.enable.get() == "Enable":
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
st3 = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
st3 = ExceptionStatus(
self.hdf.write_status,
0,
operation="!=",
exception=RuntimeError(
f"HDF5 write failed on device {self.name} with file path {self._full_path}, please check with support if the IOC has sufficient permissions to write to disk."
),
)
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images & st3
@@ -780,9 +788,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
def _complete_callback(self, status: CompareStatus) -> None:
"""Callback for when the device completes a scan."""
if (
self.hdf.enable.get() != "Enable"
): # TODO: Not sure if we should support disabled file writing.
if self.hdf.enable.get() != "Enable":
return
if status.success:
self.file_event.put(
@@ -0,0 +1,148 @@
# TimePixFly client integration
This directory contains the integration layer for the TimePixFly service (https://github.com/paulscherrerinstitute/TimePixFly). This backend allows to receive the raw data stream from the Serval Timepix detector for XES acquisition.
This backend is designed to be used by the top-level `Timepix` device, but can also be used independently with a mock server. A minimal interface to the mock_server is provided in `./test_utils`, with the server code being available via the TimePixFly repository (https://github.com/paulscherrerinstitute/TimePixFly/blob/master/src/test_server.cpp).
The integration here is structured around two main components:
- `TimepixFlyClient` is the low-level service client that manages WebSocket connections, REST calls, and status tracking.
- `TimepixFlyBackend` is the detector-lifecycle adapter that connects the client to BEC's `PSIDeviceBase` hooks and manages raw-data callbacks.
In addition, `timepix_fly_interface.py` contains Pydantic models that define the expected schema for REST payloads and raw-data messages. This keeps the rest of the codebase insulated from low-level protocol details and makes it easier to adapt to future changes in the TimePixFly service.
The top-level `Timepix` detector owns a `TimepixFlyBackend`, and the backend owns a `TimepixFlyClient`.
## Main classes
### `TimepixFlyClient`
`TimepixFlyClient` in `timepix_fly_client.py` is the low-level service client. It is decoupled from any BEC-specific logic, but supports to attach `StatusBase` objects to TimePixFly states for seamless integration with ophyd. This allows to attach status objects to manage an asynchronous handling of state changes in the TimePixFly backend.
- connect to the TimePixFly WebSocket endpoint;
- keep the latest `TimePixFlyStatus`;
- attach `StatusBase` objects from ophyd_devices to status changes, marking them as success or error states;
- issue REST GET/PUT commands;
- parse REST responses into Pydantic models.
The client is initialized with:
```python
TimepixFlyClient(rest_url="P6-0008.psi.ch:8452", ws_url="P6-0008.psi.ch:8452/ws")
```
The different states of the backend are listed in the enum `TimePixFlyStatus`:
```text
init, config, setup, collect, shutdown, undefined, await_connection, except
```
The method `add_status_callback(status: StatusBase, success_states: list[TimePixFlyStatus], error_states: list[TimePixFlyStatus])` allows to attach an ophyd status object to the TimePixFly state machine. The status will be marked as successful when the client receives a state in `success_states`, and will be marked as failed when it receives a state in `error_states`. This mechanism is used by the backend to manage the asynchronous nature of the TimePixFly service and to integrate it with the BEC scan machinery.
### `TimepixFlyBackend`
`TimepixFlyBackend` in `timepix_fly_backend.py` is the interface that maps to the existing lifecycle hooks of the `PSIDeviceBase` class. It wraps `TimepixFlyClient` with methods that are organized around the lifecycle hooks for BEC scans and device management:
- `on_connected()`
- `on_stage()`
- `on_trigger()`
- `on_trigger_finished()`
- `on_complete()`
- `on_stop()`
- `on_destroy()`
It also manages callbacks for raw-data messages received from TimePixFly. From the high-level `Timepix` device, a custom callback can be registered registered through `TimepixFlyBackend.add_callback()`. This callback will be called with complete acquisition packages whenever a new acquisition is completed by TimePixFly.
```python
def callback(start_frame: dict, data_frames: list[dict], end_frame: dict, **kwargs) -> None:
...
```
#### Interface models
The `timepix_fly_interface.py` module contains Pydantic models that define the expected schema for REST payloads and raw-data messages.
## Lifecycle mapping
### Connect
`TimepixFlyBackend.on_connected()` calls `TimepixFlyClient.on_connected()`. The
client first stops any running collection, then starts the WebSocket update
thread and waits for a connection.
If a Redis connector was provided, the backend registers
`_raw_data_redis_callback()` on its configured raw-data topic.
### Stage
Stage prepares the backend for acquisition. We will flush any potentially stale last-error, as this may otherwise cause unexpected failures in the backend state machine. Then we will send the configuration REST call to TimePixFly with the provided payloads. The payloads are:
- an `OtherConfigModel`, usually built by `Timepix.on_stage()`;
- a `PixelMap`, either generated by the default helper or supplied by the user.
The TimepixFly service needs to be in the state `config` to accept configuration updates, so the backend waits for this state before sending any REST calls. Once in `config`, the backend sends the configuration REST call with the provided payloads.
### Trigger
Triggering is managed by two methods. We first prepare the backend for the acquisition in `on_trigger()`, then wait for acquisition completion in `on_trigger_finished()`.
During `on_trigger()`, the backend waits for the status to reach `await_connection`, during the connection phase it moves to `collect`, and at the end of the acquisition it moves back to `config`. This happens automatically if the Serval TimePix detector finishes its acquisition. If at any point the status moves to `except` (or `shutdown`), the backend raises an error. The `on_trigger_finished()` returns a status that succeeds when TimePixFly returns to `config`. This status represents completion of the backend-side acquisition.
Timing wise, the TimePixFly detector needs to be ready to receive the raw stream before the ASI camera starts acquisition. Also the raw stream path needs to be in sync with the one set on the IOC interface.
### Complete
An acquisition is considered complete when the backend status returns to `config`.
### Stop and destroy
If `on_stop()` method is called, the backend is stopped through a rest call, and all active statuses are also failed with an error that the device has been stopped.
Cleaning up is handled by `on_destroy()`. This method is called when the device is removed from the BEC registry, and should release all resources acquired by the backend.
## Raw-data buffering
The backend accepts raw-data messages through `_raw_data_redis_callback()`. Each
message is passed to `_process_timepix_fly_msg()`.
The `TimePixFly` service sends three types of messages during acquisition: `StartFrame`, `XesData`, and `EndFrame`. The backend buffers these messages until an `EndFrame` is received, then runs all registered callbacks with the complete acquisition package.
In addition, we ensure that if a `StartFrame` is received, we clear any potentially stale messages in the buffer. In theory, this should never happen so we also log a warning if this happens. The expected flow is:
1. `StartFrame` clears any stale buffer and starts a new package.
2. `XesData` messages are appended.
3. `EndFrame` runs all registered callbacks with:
- the first message as `start_frame`;
- all middle messages as `data_frames`;
- the last message as `end_frame`.
4. The buffer is cleared in a `finally` block after callback processing.
This design keeps the low-level service protocol out of the top-level `Timepix`
device. `Timepix` only receives complete acquisition packages and can transform
them directly into BEC signals.
## Configuration payloads
`OtherConfigModel` describes how TimePixFly should process and publish data:
- `output_uri`: destination for processed raw-data messages. In the BEC
integration this is usually a Redis URI with a scan id query parameter.
For the mode where data is sent directly to Redis, we expect the URI to be of the form `redis://host:port/topic?scan-id=scan_id`. For the mode where data is sent directly to a TCP socket, we expect the URI to be of the form `tcp://host:port`.
- `save_interval`: backend histogram publish interval in 131 kHz units.
- `TRoiStart`, `TRoiStep`, `TRoiN`: time ROI definition.
`PixelMap` maps pixels to energy points. The schema is:
```text
chips: list[list[{"i": pixel_index, "p": energy_point_or_points, "f": fraction_or_fractions}]]
```
The default SuperXAS map assigns each of the eight Timepix chips to one energy
point.
## State and error handling
All scan-facing operations use ophyd status objects. A state callback completes
the status when a target TimePixFly state arrives over WebSocket. Error states
raise `TimePixStatusError` with the backend `last-error` message where possible.
Tracked statuses are also registered with `cancel_on_stop()`. Calling
`on_stop()` fails every pending status with a stop-related exception, which lets
the BEC scan machinery unwind instead of waiting forever.
@@ -20,6 +20,7 @@ import uuid
from typing import TYPE_CHECKING, Callable, Tuple
from bec_lib.logger import bec_logger
from bec_lib.redis_connector import RedisConnector
from ophyd_devices import StatusBase
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
@@ -49,7 +50,14 @@ class TimepixFlyBackendException(Exception):
class TimepixFlyBackend:
"""Timepix Fly Backend Device."""
def __init__(self, backend_rest_url: str, hostname: str | None = None, socket_port: int = 0):
def __init__(
self,
backend_rest_url: str,
hostname: str | None = None,
socket_port: int = 0,
redis_topic: str = "user/timepix_fly_backend/raw_data",
redis_connector: RedisConnector | None = None,
):
"""
Initialize the Timepix Fly Backend device.
@@ -61,6 +69,8 @@ class TimepixFlyBackend:
at the beamline computers of SLS, or localhost for local testing of the backend.
socket_port: The socket port to use. Defaults to 0,
which lets the OS choose an available port.
redis_topic: The Redis topic for backend communication.
redis_connector: Optional Redis connector for backend communication.
"""
ws_url = f"{backend_rest_url}/ws"
self.timepix_fly_client = TimepixFlyClient(rest_url=backend_rest_url, ws_url=ws_url)
@@ -76,6 +86,10 @@ class TimepixFlyBackend:
self._data_thread: threading.Thread | None = None
self._data_thread_shutdown_event = threading.Event()
# Redis settings
self.redis_connector = redis_connector
self._timepix_redis_topic = redis_topic
###################################################
###### Hooks for the PSIDeviceBase interface ######
###################################################
@@ -86,8 +100,13 @@ class TimepixFlyBackend:
logger.info("Connecting to Timepix Fly backend...")
try:
self.timepix_fly_client.on_connected(timeout=timeout / 2)
status = self.start_data_server()
status.wait(timeout=timeout / 2)
if self.redis_connector is not None:
self.redis_connector.register(
self._timepix_redis_topic, cb=self._raw_data_redis_callback
)
# status = self.start_data_server()
# status.wait(timeout=timeout / 2)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
@@ -108,6 +127,10 @@ class TimepixFlyBackend:
pixel_map (PixelMap): The pixel map for the Timepix Fly detector.
"""
time_started = time.time()
### Clean last error on backend
last_error = self.timepix_fly_client.last_error()
if last_error:
logger.info(f"TimeipxFly backend reports about last error: {last_error}")
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
@@ -149,7 +172,6 @@ class TimepixFlyBackend:
raise TimeoutError(
f"Timepix Fly backend state did not reach config state after setting config, running into timeout. Error traceback {content}."
)
logger.info(f"TimePixFly backend staged after {time.time() - time_started:.3f} seconds.")
def on_trigger(
self, status: StatusBase | DeviceStatus | None = None
@@ -327,139 +349,31 @@ class TimepixFlyBackend:
else:
logger.warning(f"Callback with UUID {cb_id} not found.")
def start_data_server(self) -> StatusBase:
def _raw_data_redis_callback(self, message: str):
"""
Start the data server to receive data from the Timepix Fly backend over a socket connection.
It will try to decypher the hostname through socket.getaddrinfo, and if multiple addresses
are found, it will use the first one. Please note that depending on the network configuration,
the hostname might not have the correct domain name attached, so it is recommended to specify
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch'.
The method creates a socket server that listens for incoming connections on the specified
hostname and port. It starts a thread that continuously receives data from the socket,
decodes the received JSON data, and processes it. The data is expected to be in JSON format,
with each message ending with a trailing byte "}\n".
Returns:
StatusBase: A status object that indicates if the data server thread is ready to accept
connections. High level implementation should ensure that the data server is
started (status.wait(timeout=4)) before any data is sent from the backend.
"""
info = socket.getaddrinfo(
self.hostname, port=self.socket_port, family=socket.AF_INET, type=socket.SOCK_STREAM
)
if len(info) == 0:
raise RuntimeError(f"Could not resolve hostname {self.hostname} for socket server.")
if len(info) > 1:
logger.info(
f"Multiple addresses found for {self.hostname}. Using the first one: {info[0]}"
)
family, socktype, proto, _, sockaddr = info[0]
self._socket_server = socket.create_server(sockaddr, family=family, backlog=1)
self._socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set the hostname and socket_port to the ones that was picked by the socket.getaddrinfo
self.hostname, self.socket_port = self._socket_server.getsockname()
logger.info(
f"Socket server started on {self.hostname}:{self.socket_port}. Waiting for connections."
)
# Create status object to return for the high level implementations
status = StatusBase()
if self._data_thread is None or not self._data_thread.is_alive():
self._data_thread_shutdown_event.clear()
self._data_thread = threading.Thread(
target=self._receive_data_on_socket, kwargs={"status": status}
)
self._data_thread.start()
else:
raise TimepixFlyBackendException(
"Data server thread is already running on timepix_fly_backend."
)
return status
def _receive_data_on_socket(self, status: StatusBase):
"""
Background loop running in a thread, that receives data from the
timepix fly backend over socket_server. The backend reconnects for every acquisition (trigger),
to this socket. Therefore, it is important to handle all connections and disconnections properly.
The buffer variable stores a string stream of received data. Whenever a trailing byte "}\n" is found,
in the buffer, the buffer is split into chunks of received data and each chunk is decoded
as a JSON object. The decoded objects are then processed, and if an EndFrame message is received,
the registered callbacks are executed with the StartFrame, all DataFrames, and the EndFrame message.
"""
buffer = ""
self._socket_server.settimeout(
0.1
) # Set short socket timeout to avoid blocking the thread loop
status.set_finished() # Indicate that the socket server is ready to accept connections
while not self._data_thread_shutdown_event.is_set(): # Shutdown event
try:
# blocks until connected or timeout reached
conn, addr = self._socket_server.accept()
except socket.timeout:
continue # Timeout is okay, continue
except Exception: # pylint: disable=broad-except
# Log error, check if shutdown event is set.
# Shutdown event should be set before socket_server.close() is called.
content = traceback.format_exc()
logger.error(f"Error accepting connection: {content}")
continue
logger.debug(f"Connection accepted from {addr} for timepix_fly backend.")
# Clear the message buffer before entering the loop.
if self.__msg_buffer:
logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}")
self.__msg_buffer.clear()
conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv
with conn:
while not self._data_thread_shutdown_event.is_set():
try:
# What if we split the chunk
chunk = conn.recv(4096) # Adjust buffer size as needed
except socket.timeout:
# Timeout is okay, continue in loop
continue
except Exception as e: # pylint: disable=broad-except
logger.error(f"Connection error: {e}. Closing connection.")
# conn = None #TODO should we reset conn?
break
if not chunk:
# Receiving an empty chunk means the connection was closed
# conn = None #TODO should we reset conn?
break
buffer += chunk.decode("utf-8")
# Check if trailing byte "}\n" present in buffer
buffer_chunks = buffer.split("}\n")
for entry in buffer_chunks[:-1]:
# Process all complete JSON objects in the buffer
self._decode_received_data(entry + "}")
# Keep the last incomplete chunk.
# If the buffer ended with "}\n", this will be an empty string.
buffer = buffer_chunks[-1]
def _decode_received_data(self, buffer: str) -> None:
"""
Decode the received data from the socket.
Callback function to handle raw data received from Redis. The message is expected to be a JSON string
containing the raw data from the Timepix Fly backend. This method decodes the JSON message and processes
it in the same way as data received from the socket connection.
Args:
buffer (str): The JSON string received from the socket.
message (str): The JSON string received from Redis containing the raw data.
"""
try:
obj, _ = self._decoder.raw_decode(buffer)
except json.JSONDecodeError:
logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}")
return # TODO should this raise, or only log error as of now?
self._process_timepix_fly_msg(message.value.data)
def _process_timepix_fly_msg(self, obj: dict) -> None:
if obj.get("type", "") == "StartFrame":
# Clear the message buffer when a new StartFrame is received, to avoid mixing messages from different acquisitions.
if self.__msg_buffer:
logger.debug(
f"Received StartFrame while msg_buffer is not empty. Clearing msg_buffer to avoid mixing messages from different acquisitions. Current msg_buffer: {self.__msg_buffer}"
)
self.__msg_buffer.clear()
self.__msg_buffer.append(obj)
if obj.get("type", "") == "EndFrame":
try:
# If the EndFrame message is received, run the callbacks
logger.debug(f"Running callbacks")
logger.debug("Running callbacks")
self.run_msg_callbacks()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -489,7 +403,6 @@ class TimepixFlyBackend:
if __name__ == "__main__": # pragma: no cover
import time
from superxas_bec.devices.timepix.timepix_fly_client.test_utils.timepix_fly_mock_server import (
TimePixFlyMockServer,
+9 -8
View File
@@ -133,6 +133,7 @@ def timepix():
scan_name="step_scan",
scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2},
num_points=3,
scan_id="test",
)
)
with (
@@ -213,7 +214,10 @@ def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, p
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert (
other_config.output_uri
== "redis://localhost:0000/public/timepix_fly_backend/raw_data?scan-id=test"
)
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
@@ -249,7 +253,10 @@ def test_timepix_on_stage_configures_mocked_backend(timepix, pixel_map):
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert (
other_config.output_uri
== "redis://localhost:0000/public/timepix_fly_backend/raw_data?scan-id=test"
)
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
@@ -367,12 +374,6 @@ def test_timepix_msg_buffer_callback_updates_xes_signals(timepix):
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32)
)
assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000)
np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0]))
assert _message_value(timepix.total_events) == 36
@@ -195,14 +195,28 @@ def test_timepix_fly_backend_decode_end_frame_runs_callbacks(backend_with_states
received["scan_id"] = scan_id
backend.add_callback(callback, kwd={"scan_id": 7})
backend._decode_received_data(
'{"type":"StartFrame","Mode":"TOA","TRoiStart":0,"TRoiStep":1,"TRoiN":2,"NumEnergyPoints":2,"save_interval":10}'
backend._process_timepix_fly_msg(
{
"type": "StartFrame",
"Mode": "TOA",
"TRoiStart": 0,
"TRoiStep": 1,
"TRoiN": 2,
"NumEnergyPoints": 2,
"save_interval": 10,
}
)
backend._decode_received_data(
'{"type":"XesData","period":1,"TDSpectra":[1,2,3,4],"totalEvents":4,"beforeROI":0,"afterROI":0}'
backend._process_timepix_fly_msg(
{
"type": "XesData",
"period": 1,
"TDSpectra": [1, 2, 3, 4],
"totalEvents": 4,
"beforeROI": 0,
"afterROI": 0,
}
)
backend._decode_received_data('{"type":"EndFrame","error":"","periods":5}')
backend._process_timepix_fly_msg({"type": "EndFrame", "error": "", "periods": 5})
assert received["start_frame"]["type"] == "StartFrame"
assert received["data_frames"][0]["type"] == "XesData"
assert received["end_frame"]["type"] == "EndFrame"