fix: fixed receiver for wrong data entries

This commit is contained in:
2023-07-21 15:36:09 +02:00
parent 8127ad2381
commit 95efd78c64
2 changed files with 26 additions and 13 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import queue
import threading
import time
import traceback
from typing import List
import h5py
@@ -27,7 +28,7 @@ class NIDAQWriterService(BECService):
"""
reshape_dataset = True
use_redis_stream = False
use_redis_stream = True
def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None:
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
@@ -108,22 +109,34 @@ class NIDAQWriterService(BECService):
msg = self.producer.xread("ni_data")
if msg:
num_msgs = len(msg[0][1])
print(f"Received {num_msgs} messages in {time.time() - start_time} seconds")
msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
start_time = time.time()
self.handle_ni_data(msgs)
print(f"Handled {num_msgs} messages in {time.time() - start_time} seconds")
try:
num_msgs = len(msg[0][1])
logger.debug(
f"Received {num_msgs} messages in {time.time() - start_time} seconds"
)
msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
start_time = time.time()
self.handle_ni_data(msgs)
logger.debug(
f"Handled {num_msgs} messages in {time.time() - start_time} seconds"
)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
time.sleep(0.01)
else:
msgs = self.producer.r.lpop("ni_data:val", 20)
time.sleep(0.001)
if msgs:
msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
start_time = time.time()
self.handle_ni_data(msgs)
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
try:
msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
start_time = time.time()
self.handle_ni_data(msgs)
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Failed to parse message: {content}")
def handle_scan_status(self, msg: BECMessage.ScanStatusMessage) -> None:
"""

View File

@@ -12,7 +12,7 @@ from bec_lib.core import (
class NIDAQSim(threading.Thread):
use_redis_stream = False
use_redis_stream = True
def run(self):
print("NIDAQSim running")