diff --git a/bec_plugins/services/NIDAQ_writer/NIDAQ_writer.py b/bec_plugins/services/NIDAQ_writer/NIDAQ_writer.py index 9583ea5..a0f5694 100644 --- a/bec_plugins/services/NIDAQ_writer/NIDAQ_writer.py +++ b/bec_plugins/services/NIDAQ_writer/NIDAQ_writer.py @@ -3,6 +3,7 @@ from __future__ import annotations import queue import threading import time +import traceback from typing import List import h5py @@ -27,7 +28,7 @@ class NIDAQWriterService(BECService): """ reshape_dataset = True - use_redis_stream = False + use_redis_stream = True def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None: super().__init__(config=config, connector_cls=connector_cls, unique_service=True) @@ -108,22 +109,34 @@ class NIDAQWriterService(BECService): msg = self.producer.xread("ni_data") if msg: - num_msgs = len(msg[0][1]) - print(f"Received {num_msgs} messages in {time.time() - start_time} seconds") - msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]] - start_time = time.time() - self.handle_ni_data(msgs) - print(f"Handled {num_msgs} messages in {time.time() - start_time} seconds") + try: + num_msgs = len(msg[0][1]) + logger.debug( + f"Received {num_msgs} messages in {time.time() - start_time} seconds" + ) + msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]] + start_time = time.time() + self.handle_ni_data(msgs) + logger.debug( + f"Handled {num_msgs} messages in {time.time() - start_time} seconds" + ) + except Exception as exc: + content = traceback.format_exc() + logger.error(f"Failed to parse message: {content}") time.sleep(0.01) else: msgs = self.producer.r.lpop("ni_data:val", 20) time.sleep(0.001) if msgs: - msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs] - print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds") - start_time = time.time() - self.handle_ni_data(msgs) - print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds") + try: + msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs] + print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds") + start_time = time.time() + self.handle_ni_data(msgs) + print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds") + except Exception as exc: + content = traceback.format_exc() + logger.error(f"Failed to parse message: {content}") def handle_scan_status(self, msg: BECMessage.ScanStatusMessage) -> None: """ diff --git a/bec_plugins/services/NIDAQ_writer/utils/nidaq_sim.py b/bec_plugins/services/NIDAQ_writer/utils/nidaq_sim.py index e4d22dd..56835a7 100644 --- a/bec_plugins/services/NIDAQ_writer/utils/nidaq_sim.py +++ b/bec_plugins/services/NIDAQ_writer/utils/nidaq_sim.py @@ -12,7 +12,7 @@ from bec_lib.core import ( class NIDAQSim(threading.Thread): - use_redis_stream = False + use_redis_stream = True def run(self): print("NIDAQSim running")