diff --git a/services/NIDAQ_writer/NIDAQ_writer.py b/services/NIDAQ_writer/NIDAQ_writer.py index ecad909..f393b62 100644 --- a/services/NIDAQ_writer/NIDAQ_writer.py +++ b/services/NIDAQ_writer/NIDAQ_writer.py @@ -15,6 +15,7 @@ from bec_lib.core import ( bec_logger, ) from bec_lib.core.bec_service import BECService +from bec_lib.core.file_utils import FileWriterMixin logger = bec_logger.logger @@ -30,12 +31,18 @@ class NIDAQWriterService(BECService): def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None: super().__init__(config=config, connector_cls=connector_cls, unique_service=True) self.queue = queue.Queue() + config = self._service_config.service_config.get("file_writer") + self.writer_mixin = FileWriterMixin(config) self._scan_status_consumer = None self._ni_data_consumer = None self._ni_data_event = None self._ni_writer = None self._ni_writer_event = None + self.scan_number = None + self.scan_is_running = False + self.filename = "" + self.elapsed_time = 0 self.start_time = 0 self._start_scan_status_consumer() @@ -83,6 +90,12 @@ class NIDAQWriterService(BECService): Read data from Redis. """ while not self._ni_data_event.is_set(): + if not self.scan_is_running: + time.sleep(0.01) + continue + + self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5") + start_time = time.time() if self.use_redis_stream: msg = self.producer.xread("ni_data") @@ -97,13 +110,13 @@ class NIDAQWriterService(BECService): time.sleep(0.01) else: msgs = self.producer.r.lpop("ni_data:val", 20) - # time.sleep(0.001) - # if msgs: - # msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs] - # print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds") - # start_time = time.time() - # self.handle_ni_data(msgs) - # print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds") + time.sleep(0.001) + if msgs: + msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs] + print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds") + start_time = time.time() + self.handle_ni_data(msgs) + print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds") def handle_scan_status(self, msg: BECMessage.ScanStatusMessage) -> None: """ @@ -112,6 +125,13 @@ class NIDAQWriterService(BECService): Args: msg: The scan status message. """ + status = msg.content["status"] + if status == "open": + self.scan_number = msg.content["info"].get("scan_number") + if self.scan_number is not None: + self.scan_is_running = True + else: + self.scan_is_running = False def handle_ni_data(self, msgs: List[BECMessage.DeviceMessage]) -> None: """ @@ -151,7 +171,9 @@ class NIDAQWriterService(BECService): # create a new file if it doesn't exist, otherwise append to it logger.info("Writing NI data to HDF5 file") start_time = time.time() - with h5py.File("test_output_no_comp_2.h5", "a") as file: + if not self.filename: + return + with h5py.File(self.filename, "a") as file: if self.reshape_dataset: for key in signals: # if the dataset already exists, append to it diff --git a/services/NIDAQ_writer/utils/nidaq_sim.py b/services/NIDAQ_writer/utils/nidaq_sim.py index c1870ed..e4d22dd 100644 --- a/services/NIDAQ_writer/utils/nidaq_sim.py +++ b/services/NIDAQ_writer/utils/nidaq_sim.py @@ -29,8 +29,8 @@ class NIDAQSim(threading.Thread): BECMessage.DeviceMessage.loads(msg) total_time = time.time() while True: - if index > 1000: - break + # if index > 1000: + # break start = time.time() # signals = { # "signal1": np.asarray(range(index, index + 300000)), @@ -40,11 +40,11 @@ class NIDAQSim(threading.Thread): index += 1 if self.use_redis_stream: - producer.xadd("ni_data:val", {"device_msg": msg}, max_size=100) + producer.xadd("ni_data", {"device_msg": msg}, max_size=100) else: producer.lpush("ni_data", msg, max_size=10) - # time.sleep(0.005) + time.sleep(0.5) print(f"Elapsed time: {time.time() - start}") print(f"Total time: {time.time() - total_time}") print(f"FPS: {index / (time.time() - total_time)}")