From 0f6d7a53440c8a86774af784425ca9751bd2127f Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 15 Aug 2025 11:01:03 +0200 Subject: [PATCH] w --- .../timepix_fly_client/timepix_fly_backend.py | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py index 0574fb7..9049183 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py @@ -263,6 +263,7 @@ class TimepixFlyBackend: with conn: while not self._data_thread_shutdown_event.is_set(): try: + # What if we split the chunk chunk = conn.recv(4096) # Adjust buffer size as needed except socket.timeout: # No data received, continue to check for new data @@ -273,11 +274,17 @@ class TimepixFlyBackend: if not chunk: break # Receiving an empty chunk means the connection was closed buffer += chunk.decode("utf-8") # Trailing byte, i.e. -> "}\n" - if buffer.endswith("}\n"): - for entry in buffer.split("}\n"): - if entry: + if "}\n" in buffer: + buffer_chunks = buffer.split("}\n") + processed_chunks = 0 + for entry in buffer_chunks: + if entry and entry.endswith("}\n"): self._decode_received_data(entry + "}") - buffer = "" # Reset buffer after processing + processed_chunks += 1 + if processed_chunks == len(buffer_chunks): + buffer = "" + else: + buffer = buffer_chunks[-1] def _decode_received_data(self, buffer: str) -> None: """ @@ -290,10 +297,20 @@ class TimepixFlyBackend: obj, _ = self._decoder.raw_decode(buffer) self.msg_buffer.append(obj) if obj.get("type", "") == "EndFrame": - # If the EndFrame message is received, run the callbacks - self.run_msg_callbacks() - # Clear the msg_buffer after processing - self.reset_message_buffer() + try: + # If the EndFrame message is received, run the callbacks + self.run_msg_callbacks() + except Exception as e: + content = traceback.format_exc() + logger.error(f"Error in msg callbacks with error msg: {content}") + keys_in_msg_buffer = "".join( + [f"{msg['type']} with keys {msg.keys()} \n" for msg in self.msg_buffer] + ) + logger.info(f"Keys in message buffer: {keys_in_msg_buffer}") + logger.error(f"Error running callbacks: {e}") + finally: + # Clear the msg_buffer after processing + self.reset_message_buffer() except json.JSONDecodeError: logger.warning(f"Failed to decode JSON from buffer: {buffer}") @@ -304,18 +321,7 @@ class TimepixFlyBackend: data_frames = self.msg_buffer[1:-1] # TODO for cb, kwd in self.callbacks.values(): - try: - logger.info( - f"Running callback for startFrame {start_frame} with {len(data_frames)} data frames and endFrame {end_frame}." - ) - cb(start_frame, data_frames, end_frame, **kwd) - except Exception: - content = traceback.format_exc() - logger.error(f"Error in callback {cb} with error msg: {content}") - keys_in_msg_buffer = "".join( - [f"{msg['type']} with keys {msg.keys()} \n" for msg in self.msg_buffer] - ) - logger.info(f"Keys in message buffer: {keys_in_msg_buffer}") + cb(start_frame, data_frames, end_frame, **kwd) if __name__ == "__main__": # pragma: no cover