refactor(timepix): cleanup

This commit is contained in:
2026-05-29 13:58:50 +02:00
parent 4ca15f9f9c
commit 72b1c4d27f
2 changed files with 13 additions and 158 deletions
+7 -25
View File
@@ -318,7 +318,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
height = self.image.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
logger.info(f"Setting preview data for {self.name} with shape {data.shape}")
logger.debug(f"Setting preview data for {self.name} with shape {data.shape}")
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -475,17 +475,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""Set whether XES data acquisition is enabled."""
self._enable_xes = value
self._enable_xes_settings(value)
# #TODO Update device manager config if available
# if self.device_manager is not None:
# dev_obj = self.device_manager.devices.get(self.name, None)
# if dev_obj is not None:
# cfg = dev_obj.get_device_config()
# if "enable_xes" in cfg and cfg["enable_xes"] != value:
# cfg["enable_xes"] = value
# dev_obj.set_device_config({"enable_xes": value})
# logger.info(
# f"Updated 'enable_xes' to {value} in device manager for {self.name}"
# )
@property
def pixel_map(self) -> PixelMap:
@@ -632,10 +621,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_stage(self) -> StatusBase | None:
"""Called while staging the device."""
### Clean last error on backend
last_error = self.backend.timepix_fly_client.last_error()
if last_error:
logger.info(f"TimeipxFly backend reports about last error: {last_error}")
###
self.accumulated_data_e1 = None
self.accumulated_data_e2 = None
@@ -697,10 +682,10 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_unstage(self) -> None:
"""Called while unstaging the device."""
# TODO what should happen for unstage? Make sure that acquisition is not running?
# self.backend.on_unstage()
# self.cam.acquire.put(0)
# status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
if self.cam.acquire_busy.get() == ACQUIRESTATUS.ACQUIRING:
self.cam.acquire.put(0)
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
status_camera.wait(timeout=self._pv_timeout)
def on_pre_scan(self) -> StatusBase:
"""Called right before the scan starts on all devices automatically."""
@@ -725,7 +710,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_camera = TransitionStatus(
self.cam.acquire_busy, [ACQUIRESTATUS.DONE, ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE]
)
img_counter = self.hdf.num_captured.get()
status_backend = None
# First we make sure that the backend reach 'config' state. This needs to happend before each trigger.
if self.enable_xes is True:
@@ -781,7 +765,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
0,
operation="!=",
exception=RuntimeError(
"HDF5 write failed, please check with support if the IOC has sufficient permissions to write to disk."
f"HDF5 write failed on device {self.name} with file path {self._full_path}, please check with support if the IOC has sufficient permissions to write to disk."
),
)
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
@@ -804,9 +788,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
def _complete_callback(self, status: CompareStatus) -> None:
"""Callback for when the device completes a scan."""
if (
self.hdf.enable.get() != "Enable"
): # TODO: Not sure if we should support disabled file writing.
if self.hdf.enable.get() != "Enable":
return
if status.success:
self.file_event.put(
@@ -107,7 +107,7 @@ class TimepixFlyBackend:
# status = self.start_data_server()
# status.wait(timeout=timeout / 2)
except Exception as exc: # pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
# pylint: disable=raise-missing-from
@@ -127,6 +127,10 @@ class TimepixFlyBackend:
pixel_map (PixelMap): The pixel map for the Timepix Fly detector.
"""
time_started = time.time()
### Clean last error on backend
last_error = self.timepix_fly_client.last_error()
if last_error:
logger.info(f"TimeipxFly backend reports about last error: {last_error}")
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
@@ -168,7 +172,6 @@ class TimepixFlyBackend:
raise TimeoutError(
f"Timepix Fly backend state did not reach config state after setting config, running into timeout. Error traceback {content}."
)
logger.info(f"TimePixFly backend staged after {time.time() - time_started:.3f} seconds.")
def on_trigger(
self, status: StatusBase | DeviceStatus | None = None
@@ -357,135 +360,6 @@ class TimepixFlyBackend:
"""
self._process_timepix_fly_msg(message.value.data)
def start_data_server(self) -> StatusBase:
"""
Start the data server to receive data from the Timepix Fly backend over a socket connection.
It will try to decypher the hostname through socket.getaddrinfo, and if multiple addresses
are found, it will use the first one. Please note that depending on the network configuration,
the hostname might not have the correct domain name attached, so it is recommended to specify
the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch'.
The method creates a socket server that listens for incoming connections on the specified
hostname and port. It starts a thread that continuously receives data from the socket,
decodes the received JSON data, and processes it. The data is expected to be in JSON format,
with each message ending with a trailing byte "}\n".
Returns:
StatusBase: A status object that indicates if the data server thread is ready to accept
connections. High level implementation should ensure that the data server is
started (status.wait(timeout=4)) before any data is sent from the backend.
"""
info = socket.getaddrinfo(
self.hostname, port=self.socket_port, family=socket.AF_INET, type=socket.SOCK_STREAM
)
if len(info) == 0:
raise RuntimeError(f"Could not resolve hostname {self.hostname} for socket server.")
if len(info) > 1:
logger.info(
f"Multiple addresses found for {self.hostname}. Using the first one: {info[0]}"
)
family, socktype, proto, _, sockaddr = info[0]
self._socket_server = socket.create_server(sockaddr, family=family, backlog=1)
self._socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set the hostname and socket_port to the ones that was picked by the socket.getaddrinfo
self.hostname, self.socket_port = self._socket_server.getsockname()
logger.info(
f"Socket server started on {self.hostname}:{self.socket_port}. Waiting for connections."
)
# Create status object to return for the high level implementations
status = StatusBase()
if self._data_thread is None or not self._data_thread.is_alive():
self._data_thread_shutdown_event.clear()
self._data_thread = threading.Thread(
target=self._receive_data_on_socket, kwargs={"status": status}
)
self._data_thread.start()
else:
raise TimepixFlyBackendException(
"Data server thread is already running on timepix_fly_backend."
)
return status
def _receive_data_on_socket(self, status: StatusBase):
"""
Background loop running in a thread, that receives data from the
timepix fly backend over socket_server. The backend reconnects for every acquisition (trigger),
to this socket. Therefore, it is important to handle all connections and disconnections properly.
The buffer variable stores a string stream of received data. Whenever a trailing byte "}\n" is found,
in the buffer, the buffer is split into chunks of received data and each chunk is decoded
as a JSON object. The decoded objects are then processed, and if an EndFrame message is received,
the registered callbacks are executed with the StartFrame, all DataFrames, and the EndFrame message.
"""
buffer = ""
self._socket_server.settimeout(
0.1
) # Set short socket timeout to avoid blocking the thread loop
status.set_finished() # Indicate that the socket server is ready to accept connections
while not self._data_thread_shutdown_event.is_set(): # Shutdown event
try:
# blocks until connected or timeout reached
conn, addr = self._socket_server.accept()
except socket.timeout:
continue # Timeout is okay, continue
except Exception: # pylint: disable=broad-except
# Log error, check if shutdown event is set.
# Shutdown event should be set before socket_server.close() is called.
content = traceback.format_exc()
logger.error(f"Error accepting connection: {content}")
continue
logger.debug(f"Connection accepted from {addr} for timepix_fly backend.")
# Clear the message buffer before entering the loop.
if self.__msg_buffer:
logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}")
self.__msg_buffer.clear()
conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv
with conn:
while not self._data_thread_shutdown_event.is_set():
try:
# What if we split the chunk
chunk = conn.recv(4096) # Adjust buffer size as needed
except socket.timeout:
# Timeout is okay, continue in loop
continue
except Exception as e: # pylint: disable=broad-except
logger.error(f"Connection error: {e}. Closing connection.")
# conn = None #TODO should we reset conn?
break
if not chunk:
# Receiving an empty chunk means the connection was closed
# conn = None #TODO should we reset conn?
break
buffer += chunk.decode("utf-8")
# Check if trailing byte "}\n" present in buffer
buffer_chunks = buffer.split("}\n")
for entry in buffer_chunks[:-1]:
# Process all complete JSON objects in the buffer
self._decode_received_data(entry + "}")
# Keep the last incomplete chunk.
# If the buffer ended with "}\n", this will be an empty string.
buffer = buffer_chunks[-1]
def _decode_received_data(self, buffer: str) -> None:
"""
Decode the received data from the socket.
Args:
buffer (str): The JSON string received from the socket.
"""
try:
obj, _ = self._decoder.raw_decode(buffer)
except json.JSONDecodeError:
logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}")
return # TODO should this raise, or only log error as of now?
self._process_timepix_fly_msg(obj)
def _process_timepix_fly_msg(self, obj: dict) -> None:
if obj.get("type", "") == "StartFrame":
# Clear the message buffer when a new StartFrame is received, to avoid mixing messages from different acquisitions.
@@ -499,7 +373,7 @@ class TimepixFlyBackend:
if obj.get("type", "") == "EndFrame":
try:
# If the EndFrame message is received, run the callbacks
logger.debug(f"Running callbacks")
logger.debug("Running callbacks")
self.run_msg_callbacks()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -529,7 +403,6 @@ class TimepixFlyBackend:
if __name__ == "__main__": # pragma: no cover
import time
from superxas_bec.devices.timepix.timepix_fly_client.test_utils.timepix_fly_mock_server import (
TimePixFlyMockServer,