feat(timepix): Timepix integration with TimepixFly backend

This commit is contained in:
2025-06-03 20:54:48 +02:00
parent a0c01ad51c
commit e7b409fa51
4 changed files with 468 additions and 0 deletions
+155
View File
@@ -0,0 +1,155 @@
"""
TimePix Detector class for interfacing with the TimePix detector. The timepix_signals module
implements the HTTP communication to the REST API for the tpx3app app.
"""
import socket
import threading
from bec_lib.logger import bec_logger
from ophyd import Device
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from superxas_bec.devices.timepix.timepix_fly_client import TimepixFlyClient
logger = bec_logger.logger
DATA_SERVER_HOST = "localhost" # Default data server host for TimePix detector
DATA_SERVER_PORT = 3015 # Default data server port for TimePix detector
class Timepix(PSIDeviceBase, Device):
"""
TimePix Detector class for interfacing with the TimePix detector.
The timepix_signals module implements the HTTP communication to the REST API for the tpx3app app
The Timepix detector REST API service for the TimePixFly backend runs on SERVER_ADDRESS defined
in the timepix_signals module. This can certainly be improved to be configurable by the config
"""
def __init__(
self,
prefix="",
*,
name,
scan_info=None,
device_manager=None,
backend_host: str | None = None,
data_server_host: str | None = None,
data_server_port: int | None = None,
**kwargs,
):
super().__init__(
prefix, name=name, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.timepix_fly_client = TimepixFlyClient(server_address=backend_host)
self._data_server_host = data_server_host if data_server_host else DATA_SERVER_HOST
self._data_server_port = (
data_server_port if data_server_port is not None else DATA_SERVER_PORT
)
self._server_thread = None
self._server_thread_event = None
self._socket_server = None
self._socket_server_allowed_connections = 1 # How many ?
self._socket_server_timeout = 1
self._socket_server_buffer_size = 1024
self._data = []
def _start_server_thread(self):
if self._server_thread is not None and self._server_thread.is_alive():
return
self._server_thread_event = threading.Event()
self._server_thread = threading.Thread(
target=self._start_data_server, name=f"{self.name}_data_server", daemon=True
)
self._server_thread.start()
def _stop_server_thread(self):
if self._server_thread is not None and self._server_thread.is_alive():
self._server_thread_event.set()
self._server_thread.join(timeout=5)
if self._server_thread.is_alive():
logger.warning(
f"Data server thread {self._server_thread.name} did not stop gracefully."
)
else:
logger.warning(
f"Data server thread {self._server_thread.name} is not running or has already stopped."
)
self._server_thread = None
self._server_thread_event = None
self._socket_server = None
self._data = []
def _start_data_server(self):
"""
Start the data server for the TimePix detector.
This method should be overridden to implement the actual data server logic.
"""
self._socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket_server.bind((self._data_server_host, self._data_server_port))
self._socket_server.listen(self._socket_server_allowed_connections)
self._socket_server.settimeout(self._socket_server_timeout)
self._receive_data_on_socket()
def _receive_data_on_socket(self):
"""Receive data on socket connection."""
while not self._server_thread_event.is_set():
try:
conn, addr = self._socket_server.accept()
logger.info(f"Accepted connection from {addr}")
with conn:
while not self._server_thread_event.is_set():
data = conn.recv(1024) # Adjust buffer size as needed
if not data:
break
logger.debug(f"Received data: {len(data)} bytes")
self._data.append(data)
# Ignore timeout exception on socket aslong as server_thread_event is not set
except socket.timeout:
continue
def on_connected(self):
"""
Called when the device is connected to the BEC service.
This method can be overridden to perform additional actions when the device connects.
"""
self._start_server_thread()
if __name__ == "__main__":
# TEST API
print("Timepix module loaded. Ready to interface with TimePix detector.")
timepix = Timepix(name="TimePixDetector")
from superxas_bec.devices.timepix.timepix_fly_interface import OtherConfigModel, PixelMap
print(f"Last error: {timepix.timepix_fly_client.last_error()}")
print(f"TimePix version: {timepix.timepix_fly_client.version().version}")
print(f"TimePix state: {timepix.timepix_fly_client.state().state}")
timepix.timepix_fly_client.set_other_config(
OtherConfigModel(
output_uri="tcp://localhost:5000", save_interval=10, TRoiStart=0, TRoiStep=1, TRoiN=100
)
)
print(f"Other config: {timepix.timepix_fly_client.get_other_config()}")
# PixelMap does not work yet
# pixel_map = PixelMap(
# chips=[
# [
# {"i": 0, "p": [0, 1, 2], "f": [0.33, 0.33, 0.33]},
# {"i": 0, "p": [0, 1, 2], "f": [0.33, 0.33, 0.33]},
# ]
# ]
# )
# timepix.timepix_fly_client.set_pixel_map(pixel_map)
# import time
# time.sleep(1) # Wait for the pixel map to be set
# test = timepix.timepix_fly_client.get_pixel_map() # TODO Throws an error at the moment
# print(test)
# print(f"Initialized {timepix.name} with prefix {timepix.prefix}")
timepix.on_connected()
timepix._server_thread_event.wait(timeout=3.0)
timepix._server_thread_event.set()
@@ -0,0 +1,208 @@
"""
This module implements a python client ot the REST interface of the tpx3app.
"""
from typing import Any, Type
import requests
from superxas_bec.devices.timepix.timepix_fly_interface import (
LastError,
OtherConfigModel,
PixelMap,
PixelMapFromFile,
ProgramState,
TimePixResponse,
Version,
)
SERVER_ADDRESS = "localhost:8043" # Default server address for TimePix REST API
# pylint: disable=arguments-differ
class TimepixFlyClient:
"""
A client for the TimePix detector REST API.
This class is used to interact with the TimePix detector via its REST API.
It provides methods to send GET and PUT requests to the TimePix server.
"""
def __init__(self, server_address: str | None = None):
"""
Initialize the TimePixFlyClient with a server address.
Args:
server_address (str): The address of the TimePix REST API: "tpx3app".
"""
self._server_address = server_address if server_address else SERVER_ADDRESS
self._timeout = 5 # Default timeout for requests
def _get(
self, get_cmd: str, get_response_model: Type[TimePixResponse] | None = None
) -> Type[TimePixResponse] | None:
"""
Send a GET request to the TimePix server.
Args:
get_cmd (str): The command to send in the GET request.
get_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
Returns:
Any: The parsed response if a model is provided, else the raw response.
"""
response = requests.get(f"http://{self._server_address}/{get_cmd}", timeout=self._timeout)
response.raise_for_status() # Raise an error for bad responses
if get_response_model is not None:
return get_response_model(**response.json())
def _put(
self, put_cmd: str, value: dict[str, Any], put_response_model: Type[TimePixResponse]
) -> Type[TimePixResponse] | None:
"""
Send a PUT request to the TimePix server.
Args:
put_cmd (str): The command to send in the PUT request.
value (dict[str, Any]): The value to send in the PUT request.
put_response_model (Type[TimePixResponse]): The Pydantic model to parse the response.
Returns:
Any: The parsed response if a model is provided, else None.
"""
response = requests.put(
f"http://{self._server_address}/{put_cmd}", json=value, timeout=self._timeout
)
response.raise_for_status()
if put_response_model is not None:
return put_response_model(**response.json())
def start(self) -> None:
"""
Start the TimePix detector by sending a GET request to the start endpoint.
This method is a wrapper around the REST API call to start the detector.
"""
self._get(get_cmd="?start")
def stop(self) -> None:
"""
Stop the TimePix detector by sending a GET request to the stop endpoint.
This method is a wrapper around the REST API call to stop the detector.
"""
self._get(get_cmd="?stop")
def kill(self) -> None:
"""
Kill the TimePix detector by sending a GET request to the kill endpoint.
This method is a wrapper around the REST API call to kill the detector.
"""
self._get(get_cmd="?kill")
def last_error(self) -> LastError:
"""
Get the last error message from the TimePix detector by sending a GET request
to the last-error endpoint.
Returns:
LastError: The last error message from the detector.
"""
return self._get(get_cmd="last-error", get_response_model=LastError)
def state(self) -> ProgramState:
"""
Get the program state of the TimePix detector by sending a GET request
to the state endpoint.
Returns:
ProgramState: The current state of the TimePix detector.
"""
return self._get(get_cmd="state", get_response_model=ProgramState)
def version(self) -> Version:
"""
Get the version of the TimePix detector by sending a GET request
to the version endpoint.
Returns:
Version: The version information of the TimePix detector.
"""
return self._get(get_cmd="version", get_response_model=Version)
def get_pixel_map(self) -> PixelMap:
"""
Get the pixel map of the TimePix detector by sending a GET request
to the pixel-map endpoint.
Returns:
PixelMap: The pixel map of the TimePix detector.
"""
return self._get(get_cmd="pixel-map", get_response_model=PixelMap)
def set_pixel_map(self, pixel_map: PixelMap | dict) -> None:
"""
Set the pixel map of the TimePix detector by sending a PUT request
to the pixel-map endpoint.
Args:
pixel_map (PixelMap | dict): The pixel map to set. Can be a PixelMap instance or a dictionary.
"""
if not isinstance(pixel_map, PixelMap):
if isinstance(pixel_map, dict):
pixel_map = PixelMap(**pixel_map)
else:
raise ValueError(
f"Value must be an instance of PixelMap. Received {type(pixel_map)}, {pixel_map}."
)
self._put(put_cmd="pixel-map", value=pixel_map.model_dump(), put_response_model=None)
def set_pixel_map_from_file(self, pixel_map_file: PixelMapFromFile | dict | str) -> None:
"""
Set the pixel map of the TimePix detector from a file by sending a PUT request
to the pixel-map-from-file endpoint.
Args:
pixel_map_file (PixelMapFromFile | dict): The pixel map from a file to set.
Can be a PixelMapFromFile instance or a dictionary.
"""
if not isinstance(pixel_map_file, PixelMapFromFile):
if isinstance(pixel_map_file, dict):
pixel_map_file = PixelMapFromFile(**pixel_map_file)
elif isinstance(pixel_map_file, str):
pixel_map_file = PixelMapFromFile(filename=pixel_map_file)
else:
raise ValueError(
f"Value must be an instance of PixelMapFromFile. Received {type(pixel_map_file)}, {pixel_map_file}."
)
self._put(
put_cmd="pixel-map-from-file",
value=pixel_map_file.model_dump(),
put_response_model=PixelMapFromFile,
)
def get_other_config(self) -> OtherConfigModel:
"""
Get the other configuration parameters of the TimePix detector by sending a GET request
to the other-config endpoint.
Returns:
OtherConfigModel: The other configuration parameters of the TimePix detector.
"""
return self._get(get_cmd="other-config", get_response_model=OtherConfigModel)
def set_other_config(self, other_config: OtherConfigModel | dict) -> None:
"""
Set the other configuration parameters of the TimePix detector by sending a PUT request
to the other-config endpoint.
Args:
other_config (OtherConfigModel | dict): The other configuration parameters to set.
Can be an OtherConfigModel instance or a dictionary.
"""
if not isinstance(other_config, OtherConfigModel):
if isinstance(other_config, dict):
other_config = OtherConfigModel(**other_config)
else:
raise ValueError(
f"Value must be an instance of OtherConfigModel. Received {type(other_config)}, {other_config}."
)
self._put(put_cmd="other-config", value=other_config.model_dump(), put_response_model=None)
@@ -0,0 +1,105 @@
"""
This module defines Pydantic models for the TimePix detector API responses. These
models are used to validate and structure the data returned by the TimePix REST API.
Any change will be reflected immediately, which will simplify debugging if the API changes.
"""
from typing import Literal
from pydantic import BaseModel
class TimePixResponse(BaseModel):
"""Base model for TimePix responses."""
model_config = {"validate_assignment": True}
class OtherConfigModel(TimePixResponse):
"""
OtherConfigModel is a Pydantic model that represents the configuration
for the TimePix detector.
Attributes:
- type: str - The type of the configuration, default is "OtherConfig".
- output_uri: str - The URI for the data stream. The backend will send data to this address.
It is the responsibility of the device must start a TCP server and listen on this socket to
receive the data.
- save_interval: int - The interval at which histograms are written.
- TRoiStart: int - The start time for the Time ROI (Region of Interest).
- TRoiStep: int - The step size for the Time ROI.
- TRoiN: int - The number of points in the Time ROI.
"""
type: str = "OtherConfig"
output_uri: str
save_interval: int
TRoiStart: int
TRoiStep: int
TRoiN: int
class LastError(TimePixResponse):
"""
TimePixLastError is a Pydantic model that represents the last error message
from the TimePix detector api for the REST API call '/last-error'.
Attributes:
- type: str - The type of the response, default is "LastError".
- message: str - The last error message from the detector.
"""
type: str = "LastError"
message: str
class ProgramState(TimePixResponse):
"""
ProgramState is a Pydantic model that represents the state of the TimePix program.
Attributes:
- type: str - The type of the response, default is "ProgramState".
- state: str - The current state of the progra, can be "init", "config", "setup", "collect", "shutdown"
"""
type: str = "ProgramState"
state: Literal["init", "config", "setup", "collect", "shutdown"]
class Version(TimePixResponse):
"""
Version is a Pydantic model that represents the version information of the TimePix detector.
Attributes:
- type: str - The type of the response, default is "Version".
- version: str - The version string of the TimePix detector.
"""
type: str = "Version"
version: str
class PixelMapFromFile(TimePixResponse):
"""
PixelMapFromFile is a Pydantic model that represents a pixel map loaded from a file.
Attributes:
- type: str - The type of the response, default is "PixelMapFromFile".
- filename: str - The path to the file containing the pixel map.
"""
type: str = "PixelMapFromFile"
filename: str
class PixelMap(TimePixResponse):
"""
PixelMap is a Pydantic model that represents the pixel mapping for the TimePix detector.
Attributes:
- type: str - The type of the response, default is "PixelMap".
- chips: list - A list of chips, each containing a list of pixel mappings.
"""
type: str = "PixelMap"
chips: list[list[dict[Literal["i", "p", "f"], int | float | list[int | float]]]]