refactor(timepix-fly-backend): deactivate data-server, add redis-callback
This commit is contained in:
@@ -20,6 +20,7 @@ import uuid
|
||||
from typing import TYPE_CHECKING, Callable, Tuple
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.redis_connector import RedisConnector
|
||||
from ophyd_devices import StatusBase
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
|
||||
@@ -49,7 +50,14 @@ class TimepixFlyBackendException(Exception):
|
||||
class TimepixFlyBackend:
|
||||
"""Timepix Fly Backend Device."""
|
||||
|
||||
def __init__(self, backend_rest_url: str, hostname: str | None = None, socket_port: int = 0):
|
||||
def __init__(
|
||||
self,
|
||||
backend_rest_url: str,
|
||||
hostname: str | None = None,
|
||||
socket_port: int = 0,
|
||||
redis_topic: str = "user/timepix_fly_backend/raw_data",
|
||||
redis_connector: RedisConnector | None = None,
|
||||
):
|
||||
"""
|
||||
Initialize the Timepix Fly Backend device.
|
||||
|
||||
@@ -61,6 +69,8 @@ class TimepixFlyBackend:
|
||||
at the beamline computers of SLS, or localhost for local testing of the backend.
|
||||
socket_port: The socket port to use. Defaults to 0,
|
||||
which lets the OS choose an available port.
|
||||
redis_topic: The Redis topic for backend communication.
|
||||
redis_connector: Optional Redis connector for backend communication.
|
||||
"""
|
||||
ws_url = f"{backend_rest_url}/ws"
|
||||
self.timepix_fly_client = TimepixFlyClient(rest_url=backend_rest_url, ws_url=ws_url)
|
||||
@@ -76,6 +86,10 @@ class TimepixFlyBackend:
|
||||
self._data_thread: threading.Thread | None = None
|
||||
self._data_thread_shutdown_event = threading.Event()
|
||||
|
||||
# Redis settings
|
||||
self.redis_connector = redis_connector
|
||||
self._timepix_redis_topic = redis_topic
|
||||
|
||||
###################################################
|
||||
###### Hooks for the PSIDeviceBase interface ######
|
||||
###################################################
|
||||
@@ -86,8 +100,13 @@ class TimepixFlyBackend:
|
||||
logger.info("Connecting to Timepix Fly backend...")
|
||||
try:
|
||||
self.timepix_fly_client.on_connected(timeout=timeout / 2)
|
||||
status = self.start_data_server()
|
||||
status.wait(timeout=timeout / 2)
|
||||
if self.redis_connector is not None:
|
||||
self.redis_connector.register(
|
||||
self._timepix_redis_topic, self._raw_data_redis_callback
|
||||
)
|
||||
|
||||
# status = self.start_data_server()
|
||||
# status.wait(timeout=timeout / 2)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error starting data server: {content}")
|
||||
@@ -327,6 +346,17 @@ class TimepixFlyBackend:
|
||||
else:
|
||||
logger.warning(f"Callback with UUID {cb_id} not found.")
|
||||
|
||||
def _raw_data_redis_callback(self, message: str):
|
||||
"""
|
||||
Callback function to handle raw data received from Redis. The message is expected to be a JSON string
|
||||
containing the raw data from the Timepix Fly backend. This method decodes the JSON message and processes
|
||||
it in the same way as data received from the socket connection.
|
||||
|
||||
Args:
|
||||
message (str): The JSON string received from Redis containing the raw data.
|
||||
"""
|
||||
self._decode_received_data(message)
|
||||
|
||||
def start_data_server(self) -> StatusBase:
|
||||
"""
|
||||
Start the data server to receive data from the Timepix Fly backend over a socket connection.
|
||||
@@ -454,6 +484,13 @@ class TimepixFlyBackend:
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}")
|
||||
return # TODO should this raise, or only log error as of now?
|
||||
if obj.get("type", "") == "StartFrame":
|
||||
# Clear the message buffer when a new StartFrame is received, to avoid mixing messages from different acquisitions.
|
||||
if self.__msg_buffer:
|
||||
logger.debug(
|
||||
f"Received StartFrame while msg_buffer is not empty. Clearing msg_buffer to avoid mixing messages from different acquisitions. Current msg_buffer: {self.__msg_buffer}"
|
||||
)
|
||||
self.__msg_buffer.clear()
|
||||
|
||||
self.__msg_buffer.append(obj)
|
||||
if obj.get("type", "") == "EndFrame":
|
||||
|
||||
Reference in New Issue
Block a user