diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py index bc7f8bd..f1d61d8 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py @@ -20,6 +20,7 @@ import uuid from typing import TYPE_CHECKING, Callable, Tuple from bec_lib.logger import bec_logger +from bec_lib.redis_connector import RedisConnector from ophyd_devices import StatusBase from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import ( @@ -49,7 +50,14 @@ class TimepixFlyBackendException(Exception): class TimepixFlyBackend: """Timepix Fly Backend Device.""" - def __init__(self, backend_rest_url: str, hostname: str | None = None, socket_port: int = 0): + def __init__( + self, + backend_rest_url: str, + hostname: str | None = None, + socket_port: int = 0, + redis_topic: str = "user/timepix_fly_backend/raw_data", + redis_connector: RedisConnector | None = None, + ): """ Initialize the Timepix Fly Backend device. @@ -61,6 +69,8 @@ class TimepixFlyBackend: at the beamline computers of SLS, or localhost for local testing of the backend. socket_port: The socket port to use. Defaults to 0, which lets the OS choose an available port. + redis_topic: The Redis topic for backend communication. + redis_connector: Optional Redis connector for backend communication. """ ws_url = f"{backend_rest_url}/ws" self.timepix_fly_client = TimepixFlyClient(rest_url=backend_rest_url, ws_url=ws_url) @@ -76,6 +86,10 @@ class TimepixFlyBackend: self._data_thread: threading.Thread | None = None self._data_thread_shutdown_event = threading.Event() + # Redis settings + self.redis_connector = redis_connector + self._timepix_redis_topic = redis_topic + ################################################### ###### Hooks for the PSIDeviceBase interface ###### ################################################### @@ -86,8 +100,13 @@ class TimepixFlyBackend: logger.info("Connecting to Timepix Fly backend...") try: self.timepix_fly_client.on_connected(timeout=timeout / 2) - status = self.start_data_server() - status.wait(timeout=timeout / 2) + if self.redis_connector is not None: + self.redis_connector.register( + self._timepix_redis_topic, self._raw_data_redis_callback + ) + + # status = self.start_data_server() + # status.wait(timeout=timeout / 2) except Exception: # pylint: disable=broad-except content = traceback.format_exc() logger.error(f"Error starting data server: {content}") @@ -327,6 +346,17 @@ class TimepixFlyBackend: else: logger.warning(f"Callback with UUID {cb_id} not found.") + def _raw_data_redis_callback(self, message: str): + """ + Callback function to handle raw data received from Redis. The message is expected to be a JSON string + containing the raw data from the Timepix Fly backend. This method decodes the JSON message and processes + it in the same way as data received from the socket connection. + + Args: + message (str): The JSON string received from Redis containing the raw data. + """ + self._decode_received_data(message) + def start_data_server(self) -> StatusBase: """ Start the data server to receive data from the Timepix Fly backend over a socket connection. @@ -454,6 +484,13 @@ class TimepixFlyBackend: except json.JSONDecodeError: logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}") return # TODO should this raise, or only log error as of now? + if obj.get("type", "") == "StartFrame": + # Clear the message buffer when a new StartFrame is received, to avoid mixing messages from different acquisitions. + if self.__msg_buffer: + logger.debug( + f"Received StartFrame while msg_buffer is not empty. Clearing msg_buffer to avoid mixing messages from different acquisitions. Current msg_buffer: {self.__msg_buffer}" + ) + self.__msg_buffer.clear() self.__msg_buffer.append(obj) if obj.get("type", "") == "EndFrame":