w
This commit is contained in:
@@ -263,6 +263,7 @@ class TimepixFlyBackend:
|
||||
with conn:
|
||||
while not self._data_thread_shutdown_event.is_set():
|
||||
try:
|
||||
# What if we split the chunk
|
||||
chunk = conn.recv(4096) # Adjust buffer size as needed
|
||||
except socket.timeout:
|
||||
# No data received, continue to check for new data
|
||||
@@ -273,11 +274,17 @@ class TimepixFlyBackend:
|
||||
if not chunk:
|
||||
break # Receiving an empty chunk means the connection was closed
|
||||
buffer += chunk.decode("utf-8") # Trailing byte, i.e. -> "}\n"
|
||||
if buffer.endswith("}\n"):
|
||||
for entry in buffer.split("}\n"):
|
||||
if entry:
|
||||
if "}\n" in buffer:
|
||||
buffer_chunks = buffer.split("}\n")
|
||||
processed_chunks = 0
|
||||
for entry in buffer_chunks:
|
||||
if entry and entry.endswith("}\n"):
|
||||
self._decode_received_data(entry + "}")
|
||||
buffer = "" # Reset buffer after processing
|
||||
processed_chunks += 1
|
||||
if processed_chunks == len(buffer_chunks):
|
||||
buffer = ""
|
||||
else:
|
||||
buffer = buffer_chunks[-1]
|
||||
|
||||
def _decode_received_data(self, buffer: str) -> None:
|
||||
"""
|
||||
@@ -290,10 +297,20 @@ class TimepixFlyBackend:
|
||||
obj, _ = self._decoder.raw_decode(buffer)
|
||||
self.msg_buffer.append(obj)
|
||||
if obj.get("type", "") == "EndFrame":
|
||||
# If the EndFrame message is received, run the callbacks
|
||||
self.run_msg_callbacks()
|
||||
# Clear the msg_buffer after processing
|
||||
self.reset_message_buffer()
|
||||
try:
|
||||
# If the EndFrame message is received, run the callbacks
|
||||
self.run_msg_callbacks()
|
||||
except Exception as e:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error in msg callbacks with error msg: {content}")
|
||||
keys_in_msg_buffer = "".join(
|
||||
[f"{msg['type']} with keys {msg.keys()} \n" for msg in self.msg_buffer]
|
||||
)
|
||||
logger.info(f"Keys in message buffer: {keys_in_msg_buffer}")
|
||||
logger.error(f"Error running callbacks: {e}")
|
||||
finally:
|
||||
# Clear the msg_buffer after processing
|
||||
self.reset_message_buffer()
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Failed to decode JSON from buffer: {buffer}")
|
||||
|
||||
@@ -304,18 +321,7 @@ class TimepixFlyBackend:
|
||||
data_frames = self.msg_buffer[1:-1]
|
||||
# TODO
|
||||
for cb, kwd in self.callbacks.values():
|
||||
try:
|
||||
logger.info(
|
||||
f"Running callback for startFrame {start_frame} with {len(data_frames)} data frames and endFrame {end_frame}."
|
||||
)
|
||||
cb(start_frame, data_frames, end_frame, **kwd)
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error in callback {cb} with error msg: {content}")
|
||||
keys_in_msg_buffer = "".join(
|
||||
[f"{msg['type']} with keys {msg.keys()} \n" for msg in self.msg_buffer]
|
||||
)
|
||||
logger.info(f"Keys in message buffer: {keys_in_msg_buffer}")
|
||||
cb(start_frame, data_frames, end_frame, **kwd)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
|
||||
Reference in New Issue
Block a user