refactor: refactored timepix integration, timepix_fly_client moved to seperate module

This commit is contained in:
2025-06-05 16:26:29 +02:00
parent d4ce20a547
commit 714a038cd9
6 changed files with 415 additions and 93 deletions

View File

@@ -0,0 +1,32 @@
if __name__ == """__main__""":
import time
from superxas_bec.devices.timepix.timepix import Timepix
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import (
TimePixFlyMockServer,
)
mock_server = TimePixFlyMockServer()
# Create a Timepix object
timepix = Timepix(name="TimePixDetector")
timepix.on_connected()
timepix.stage()
timepix.pre_scan()
print(f"State of timepix_fly_client {timepix.timepix_fly_client.state().state}")
mock_server.start_acquisition()
print(f"State of timepix_fly_client {timepix.timepix_fly_client.state().state}")
time.sleep(0.001)
print(f"State of timepix_fly_client {timepix.timepix_fly_client.state().state}")
time.sleep(0.1)
print(f"State of timepix_fly_client {timepix.timepix_fly_client.state().state}")
time.sleep(0.1)
print(f"State of timepix_fly_client {timepix.timepix_fly_client.state().state}")
time.sleep(1)
print(f"State of timepix_fly_client {timepix.timepix_fly_client.state().state}")
# time.sleep(5)
timepix.complete()
for ii, msg in enumerate(timepix._data_buffer):
print(f"Received data {ii} with message type {msg['type']}")
timepix.unstage()
# timepix.destroy()

View File

@@ -3,20 +3,30 @@ TimePix Detector class for interfacing with the TimePix detector. The timepix_si
implements the HTTP communication to the REST API for the tpx3app app.
"""
import json
import signal
import socket
import threading
import time
from typing import Literal
from bec_lib.logger import bec_logger
from ophyd import Device
from ophyd import Device, DeviceStatus, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from superxas_bec.devices.timepix.timepix_fly_client import TimepixFlyClient
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimepixFlyClient
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
OtherConfigModel,
PixelMap,
)
logger = bec_logger.logger
DATA_SERVER_HOST = "localhost" # Default data server host for TimePix detector
DATA_SERVER_PORT = 3015 # Default data server port for TimePix detector
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-locals
class Timepix(PSIDeviceBase, Device):
"""
@@ -28,7 +38,6 @@ class Timepix(PSIDeviceBase, Device):
def __init__(
self,
prefix="",
*,
name,
scan_info=None,
@@ -38,118 +47,298 @@ class Timepix(PSIDeviceBase, Device):
data_server_port: int | None = None,
**kwargs,
):
super().__init__(
prefix, name=name, scan_info=scan_info, device_manager=device_manager, **kwargs
"""
#TODO addd docstring
"""
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self.timepix_fly_client = TimepixFlyClient(
api_server_address=backend_host, logger=logger, parent=self
)
self.timepix_fly_client = TimepixFlyClient(server_address=backend_host)
self._data_server_host = data_server_host if data_server_host else DATA_SERVER_HOST
self._data_server_port = (
data_server_port if data_server_port is not None else DATA_SERVER_PORT
)
self._server_thread = None
self._server_thread_event = None
self._rlock = threading.RLock()
# Data server
self._data_server_thread = None
self._data_server_thread_event = None
# Socket server
self._socket_server = None
self._socket_server_allowed_connections = 1 # How many ?
self._socket_server_timeout = 1
self._socket_server_buffer_size = 1024
self._data = []
self._socket_server_timeout = 0.1
self._socket_server_buffer_size = 4096
#
self._data_buffer = []
# Decoding
self._decoder = json.JSONDecoder()
def _start_server_thread(self):
if self._server_thread is not None and self._server_thread.is_alive():
return
self._server_thread_event = threading.Event()
self._server_thread = threading.Thread(
target=self._start_data_server, name=f"{self.name}_data_server", daemon=True
### Beamline specifi methods for the TimePix Detector integration ###
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def sigint_handler(*args):
"""Ensure that the on_destroy method is called when the process is killed."""
self.on_destroy()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.timepix_fly_client.on_connected()
self.start_data_server()
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
self._reset_buffers()
# Parse scan info for OtherConfig
config = OtherConfigModel(
output_uri=f"tcp:{self._data_server_host}:{self._data_server_port}",
TRoiStep=1,
TRoiN=5000,
)
self._server_thread.start()
# Parse pixel map from scan info if needed, otherwise use some default pixel map.
pixel_map = PixelMap(
chips=[
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
]
)
self.timepix_fly_client.set_other_config(config)
self.timepix_fly_client.set_pixel_map(pixel_map)
self._wait_for_state_condition("config", timeout=5.0)
def _stop_server_thread(self):
if self._server_thread is not None and self._server_thread.is_alive():
self._server_thread_event.set()
self._server_thread.join(timeout=5)
if self._server_thread.is_alive():
logger.warning(
f"Data server thread {self._server_thread.name} did not stop gracefully."
)
else:
logger.warning(
f"Data server thread {self._server_thread.name} is not running or has already stopped."
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
self._reset_buffers()
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
self.timepix_fly_client.start()
self._wait_for_state_condition("setup", timeout=5.0)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
self._wait_for_state_condition("config", timeout=5.0)
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.timepix_fly_client.stop()
self._reset_buffers()
### Custom methods for the TimePix Data server ###
def _wait_for_state_condition(
self, state: Literal["init", "config", "setup", "collect", "shutdown"], timeout: float = 5.0
) -> None:
"""
Wait for the TimePixFly backend to reach a specific state.
Args:
state (Literal["init", "config", "setup", "collect", "shutdown"]): The state to wait for.
timeout (float): The maximum time to wait for the state in seconds. Default is 5.0 seconds.
Raises:
RuntimeError: If the TimePixFly backend does not reach the specified state within the timeout.
"""
def _check_state():
return self.timepix_fly_client.state().state == state
if self.wait_for_condition(_check_state, timeout=timeout, interval=0.25) is False:
raise RuntimeError(
f"TimePix Fly client {self.name} did not reach the '{state}' state in time."
f"Current state: {self.timepix_fly_client.state().state}"
)
self._server_thread = None
self._server_thread_event = None
self._socket_server = None
self._data = []
def _start_data_server(self):
"""
Start the data server for the TimePix detector.
This method should be overridden to implement the actual data server logic.
"""
self._socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket_server.bind((self._data_server_host, self._data_server_port))
self._socket_server.listen(self._socket_server_allowed_connections)
self._socket_server.settimeout(self._socket_server_timeout)
self._receive_data_on_socket()
def _start_data_receiver(self):
"""Start the data server thread. If the thread is already running, do nothing."""
if self._data_server_thread is not None and self._data_server_thread.is_alive():
return
self._data_server_thread_event = threading.Event()
self._data_server_thread = threading.Thread(
target=self._receive_data_on_socket, name=f"{self.name}_data_server"
)
self._data_server_thread.start()
def _stop_data_receiver(self):
if self._data_server_thread is not None and self._data_server_thread.is_alive():
self._data_server_thread_event.set()
self._data_server_thread.join(timeout=5.0)
if self._data_server_thread is not None and self._data_server_thread.is_alive():
logger.warning(f"Data server thread did not stop gracefully.")
else:
logger.warning(f"Data server thread is not running or has already stopped.")
def _receive_data_on_socket(self):
"""Receive data on socket connection."""
while not self._server_thread_event.is_set():
buffer = ""
# for testing purposes, move logic into separate function for each while loop
while not self._data_server_thread_event.is_set():
try:
conn, addr = self._socket_server.accept()
logger.info(f"Accepted connection from {addr}")
with conn:
while not self._server_thread_event.is_set():
data = conn.recv(1024) # Adjust buffer size as needed
if not data:
break
logger.debug(f"Received data: {len(data)} bytes")
self._data.append(data)
while not self._data_server_thread_event.is_set():
# TODO check if recv is blocking try and except socket.timeout
chunk = conn.recv(4096) # Adjust buffer size as needed
if not chunk:
time.sleep(0.1) # No data received, wait a bit before next attempt
continue
logger.info(f"Received data: {len(chunk)} bytes")
buffer += chunk.decode("utf-8") # Trailing byte, i.e. -> "\n"
if buffer.endswith("}\n"):
self._decode_received_data(buffer)
buffer = "" # Reset buffer after processing
# Ignore timeout exception on socket aslong as server_thread_event is not set
except socket.timeout:
continue
def on_connected(self):
def _decode_received_data(self, buffer: str) -> None:
"""
Called when the device is connected to the BEC service.
This method can be overridden to perform additional actions when the device connects.
Decode the received data from the socket.
This method should be overridden to implement the actual decoding logic.
"""
self._start_server_thread()
try:
obj, idx = self._decoder.raw_decode(buffer)
self._data_buffer.append(obj)
except json.JSONDecodeError:
logger.warning(f"Failed to decode JSON from buffer: {buffer}")
# If decoding fails, append the data to the buffer and wait for more data
def _reset_buffers(self):
"""Reset the data buffers."""
logger.info(f"Resetting data buffers for {self.name}.")
self._data_buffer = []
def on_destroy(self):
"""Cleanup method to stop the data server thread, clean up the socket server"""
with self._rlock:
if self._socket_server:
try:
self._socket_server.close()
except Exception as e: # pylint: disable=broad-except
logger.warning(f"Failed to shutdown socket server gracefully. Error: {e}")
self._stop_data_receiver()
self._socket_server = None
self._data_server_thread = None
self._data_server_thread_event = None
def restart_data_receiver(self):
"""Restart the data receiver thread."""
self._stop_data_receiver()
self._reset_buffers()
if not self._socket_server:
logger.warning(f"No socket server found for {self.name}. Starting a new data server.")
self.start_data_server()
else:
self._start_data_receiver()
logger.info("Data receiver thread restarted.")
def start_data_server(self):
"""
Start the data server for the TimePix detector.
This method should be overridden to implement the actual data server logic.
"""
# AF_INET6 for IPv6, use AF_INET for IPv4; for localhost this may be different depending on the system
# TODO add an os check if self._data_server_host is localhost
self._socket_server = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self._socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket_server.bind((self._data_server_host, self._data_server_port))
self._socket_server.listen(self._socket_server_allowed_connections)
self._socket_server.settimeout(self._socket_server_timeout)
self._start_data_receiver()
logger.info(f"Data server started on {self._data_server_host}:{self._data_server_port}")
# pylint: disable=protected-access
if __name__ == "__main__":
# TEST API
print("Timepix module loaded. Ready to interface with TimePix detector.")
timepix = Timepix(name="TimePixDetector")
from superxas_bec.devices.timepix.timepix_fly_interface import OtherConfigModel, PixelMap
# from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import (
# TimePixFlyMockServer,
# )
print(f"Last error: {timepix.timepix_fly_client.last_error()}")
# mock_server = TimePixFlyMockServer()
# # Create a Timepix object
# timepix = Timepix(name="TimePixDetector")
# timepix.on_connected()
# timepix.stage()
# timepix.pre_scan()
# mock_server.start_acquisition()
# time.sleep(5)
# timepix.complete()
# print(timepix._data_buffer)
# timepix.unstage()
timepix = Timepix(name="TimePixDetector")
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import (
TimePixFlyMockServer,
)
mock_server = TimePixFlyMockServer()
timepix.on_connected()
infos = socket.getaddrinfo("localhost", None)
print(infos)
print(f"Last error: {timepix.timepix_fly_client.last_error().message}")
print(f"TimePix version: {timepix.timepix_fly_client.version().version}")
print(f"TimePix state: {timepix.timepix_fly_client.state().state}")
timepix.timepix_fly_client.set_other_config(
OtherConfigModel(
output_uri="tcp://localhost:5000", save_interval=10, TRoiStart=0, TRoiStep=1, TRoiN=100
output_uri=f"tcp:{timepix._data_server_host}:{timepix._data_server_port}",
TRoiStep=1,
TRoiN=5000,
)
)
print(f"Other config: {timepix.timepix_fly_client.get_other_config()}")
# print(f"Pixel map from file: {timepix.timepix_fly_client.get_pixel_map()}")
# PixelMap does not work yet
# pixel_map = PixelMap(
# chips=[
# [
# {"i": 0, "p": [0, 1, 2], "f": [0.33, 0.33, 0.33]},
# {"i": 0, "p": [0, 1, 2], "f": [0.33, 0.33, 0.33]},
# ]
# ]
# )
# timepix.timepix_fly_client.set_pixel_map(pixel_map)
# import time
pixel_map = PixelMap(
chips=[
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
]
)
# time.sleep(1) # Wait for the pixel map to be set
timepix.timepix_fly_client.set_pixel_map(pixel_map)
# test = timepix.timepix_fly_client.get_pixel_map() # TODO Throws an error at the moment
# print(test)
# print(f"Initialized {timepix.name} with prefix {timepix.prefix}")
print(f"Initialized {timepix.name} with prefix {timepix.prefix}")
timepix.timepix_fly_client.start()
mock_server.start_acquisition() # Start the mock server acquisition
timepix.on_connected()
time.sleep(
1
) # Wait for server stats to go to ready to start acquisition# Start acquisition, this will go through EPICS interface probably
timepix._server_thread_event.wait(timeout=3.0)
timepix._server_thread_event.set()
timepix._data_server_thread_event.wait(timeout=1.0)
timepix._data_server_thread_event.set()
print(timepix._data_buffer)
# raise RuntimeError("Stopping the server thread for testing purposes.")
data = [el in timepix._data for el in timepix._data]
# timepix._stop_data_server_thread()
print("Data server thread stopped.")

View File

@@ -6,7 +6,7 @@ from typing import Any, Type
import requests
from superxas_bec.devices.timepix.timepix_fly_interface import (
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
LastError,
OtherConfigModel,
PixelMap,
@@ -16,7 +16,7 @@ from superxas_bec.devices.timepix.timepix_fly_interface import (
Version,
)
SERVER_ADDRESS = "localhost:8043" # Default server address for TimePix REST API
SERVER_ADDRESS = "localhost:8452" # Default server address for TimePix REST API
# pylint: disable=arguments-differ
@@ -28,15 +28,31 @@ class TimepixFlyClient:
It provides methods to send GET and PUT requests to the TimePix server.
"""
def __init__(self, server_address: str | None = None):
def __init__(self, api_server_address: str | None = None, logger=None, parent=None):
"""
Initialize the TimePixFlyClient with a server address.
Args:
server_address (str): The address of the TimePix REST API: "tpx3app".
"""
self._server_address = server_address if server_address else SERVER_ADDRESS
self._api_server_address = api_server_address if api_server_address else SERVER_ADDRESS
self._timeout = 5 # Default timeout for requests
self._logger = logger
self._parent = parent
def _add_log(self, message: str) -> None:
"""
Add a log message to the logger if available.
Args:
message (str): The message to log.
"""
if self._logger is not None:
if self._parent is not None and hasattr(self._parent, "name"):
message = f"{self._parent.name}: {message}"
self._logger.info(message)
else:
self._add_log(message)
def _get(
self, get_cmd: str, get_response_model: Type[TimePixResponse] | None = None
@@ -51,10 +67,18 @@ class TimepixFlyClient:
Returns:
Any: The parsed response if a model is provided, else the raw response.
"""
response = requests.get(f"http://{self._server_address}/{get_cmd}", timeout=self._timeout)
response = requests.get(
f"http://{self._api_server_address}/{get_cmd}", timeout=self._timeout
)
response.raise_for_status() # Raise an error for bad responses
if get_response_model is not None:
return get_response_model(**response.json())
try:
return get_response_model(**response.json())
except Exception as e:
self._add_log(f"Error parsing response for {get_cmd}: Response: {response.text}")
raise e
else:
return response.text
def _put(
self, put_cmd: str, value: dict[str, Any], put_response_model: Type[TimePixResponse]
@@ -71,32 +95,46 @@ class TimepixFlyClient:
Any: The parsed response if a model is provided, else None.
"""
response = requests.put(
f"http://{self._server_address}/{put_cmd}", json=value, timeout=self._timeout
f"http://{self._api_server_address}/{put_cmd}", json=value, timeout=self._timeout
)
response.raise_for_status()
if put_response_model is not None:
return put_response_model(**response.json())
def on_connected(self) -> None:
"""
Called when the client is connected to the TimePix server.
This method can be overridden to perform actions when the client connects.
"""
try:
self.state()
except Exception as e:
self._add_log(
f"An error occurred while connecting to the TimePix server: {e}. "
f"Please check the server address and ensure the server is running."
)
raise e
def start(self) -> None:
"""
Start the TimePix detector by sending a GET request to the start endpoint.
This method is a wrapper around the REST API call to start the detector.
"""
self._get(get_cmd="?start")
self._get(get_cmd="?start=true")
def stop(self) -> None:
"""
Stop the TimePix detector by sending a GET request to the stop endpoint.
This method is a wrapper around the REST API call to stop the detector.
"""
self._get(get_cmd="?stop")
self._get(get_cmd="?stop=true")
def kill(self) -> None:
"""
Kill the TimePix detector by sending a GET request to the kill endpoint.
This method is a wrapper around the REST API call to kill the detector.
"""
self._get(get_cmd="?kill")
self._get(get_cmd="?kill=true")
def last_error(self) -> LastError:
"""

View File

@@ -6,7 +6,7 @@ Any change will be reflected immediately, which will simplify debugging if the A
from typing import Literal
from pydantic import BaseModel
from pydantic import BaseModel, Field
class TimePixResponse(BaseModel):
@@ -33,10 +33,14 @@ class OtherConfigModel(TimePixResponse):
type: str = "OtherConfig"
output_uri: str
save_interval: int
TRoiStart: int
TRoiStep: int
TRoiN: int
save_interval: int = Field(
default=131000, description="Interval in seconds to write histograms"
)
TRoiStart: int = Field(
default=0, description="Start time for the Time ROI (Region of Interest)"
)
TRoiStep: int = Field(default=1, description="Step size for the Time ROI")
TRoiN: int = Field(default=5000, description="Number of points in the Time ROI")
class LastError(TimePixResponse):
@@ -85,11 +89,11 @@ class PixelMapFromFile(TimePixResponse):
Attributes:
- type: str - The type of the response, default is "PixelMapFromFile".
- filename: str - The path to the file containing the pixel map.
- file: str - The path to the file containing the pixel map.
"""
type: str = "PixelMapFromFile"
filename: str
file: str
class PixelMap(TimePixResponse):
@@ -103,3 +107,15 @@ class PixelMap(TimePixResponse):
type: str = "PixelMap"
chips: list[list[dict[Literal["i", "p", "f"], int | float | list[int | float]]]]
class TimepixStartFrame(TimePixResponse):
pass
class TimepixDataFrame(TimePixResponse):
pass
class TimepixEndFrame(TimePixResponse):
pass

View File

@@ -0,0 +1,47 @@
"""Module to control the Timepix Fly mock server."""
import requests
class TimePixFlyMockServer:
"""
A mock server for the Timepix Fly detector that simulates the behavior of the actual server.
This is used for testing purposes and does not require a real Timepix Fly detector.
"""
def __init__(self, host: str = "localhost", port: int = 8080, logger=None):
"""
Initialize the TimePixFlyMockServer with a host and port.
Args:
host (str): The host address for the mock server. Default is "localhost".
port (int): The port number for the mock server. Default is 8080.
logger: An optional logger to log messages. If not provided, messages will be printed to the console.
"""
self.host = host
self.port = port
self.logger = logger
def add_log(self, message: str) -> None:
"""
Add a log message to the logger if available.
If no logger is provided, it will print the message to the console.
Args:
message (str): The message to log.
"""
if self.logger is not None:
self.logger.info(message)
else:
print(message)
def start_acquisition(self):
"""
Simulate starting an acquisition on the Timepix Fly detector.
This method does not perform any real acquisition but simulates the behavior.
"""
try:
requests.get(f"http://{self.host}:{self.port}/measurement/start", timeout=0.2)
except requests.exceptions.RequestException:
pass # Ignore all exceptions as there is currently no return value for the request
self.add_log("Acquisition started on Timepix Fly mock server.")