fix: removed hard-coded paths
This commit is contained in:
@@ -15,6 +15,7 @@ from bec_lib.core import (
|
||||
bec_logger,
|
||||
)
|
||||
from bec_lib.core.bec_service import BECService
|
||||
from bec_lib.core.file_utils import FileWriterMixin
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -30,12 +31,18 @@ class NIDAQWriterService(BECService):
|
||||
def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None:
|
||||
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
|
||||
self.queue = queue.Queue()
|
||||
config = self._service_config.service_config.get("file_writer")
|
||||
self.writer_mixin = FileWriterMixin(config)
|
||||
self._scan_status_consumer = None
|
||||
self._ni_data_consumer = None
|
||||
self._ni_data_event = None
|
||||
self._ni_writer = None
|
||||
self._ni_writer_event = None
|
||||
|
||||
self.scan_number = None
|
||||
self.scan_is_running = False
|
||||
self.filename = ""
|
||||
|
||||
self.elapsed_time = 0
|
||||
self.start_time = 0
|
||||
self._start_scan_status_consumer()
|
||||
@@ -83,6 +90,12 @@ class NIDAQWriterService(BECService):
|
||||
Read data from Redis.
|
||||
"""
|
||||
while not self._ni_data_event.is_set():
|
||||
if not self.scan_is_running:
|
||||
time.sleep(0.01)
|
||||
continue
|
||||
|
||||
self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5")
|
||||
|
||||
start_time = time.time()
|
||||
if self.use_redis_stream:
|
||||
msg = self.producer.xread("ni_data")
|
||||
@@ -97,13 +110,13 @@ class NIDAQWriterService(BECService):
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
msgs = self.producer.r.lpop("ni_data:val", 20)
|
||||
# time.sleep(0.001)
|
||||
# if msgs:
|
||||
# msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
|
||||
# print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
# start_time = time.time()
|
||||
# self.handle_ni_data(msgs)
|
||||
# print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
time.sleep(0.001)
|
||||
if msgs:
|
||||
msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
|
||||
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
start_time = time.time()
|
||||
self.handle_ni_data(msgs)
|
||||
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
|
||||
def handle_scan_status(self, msg: BECMessage.ScanStatusMessage) -> None:
|
||||
"""
|
||||
@@ -112,6 +125,13 @@ class NIDAQWriterService(BECService):
|
||||
Args:
|
||||
msg: The scan status message.
|
||||
"""
|
||||
status = msg.content["status"]
|
||||
if status == "open":
|
||||
self.scan_number = msg.content["info"].get("scan_number")
|
||||
if self.scan_number is not None:
|
||||
self.scan_is_running = True
|
||||
else:
|
||||
self.scan_is_running = False
|
||||
|
||||
def handle_ni_data(self, msgs: List[BECMessage.DeviceMessage]) -> None:
|
||||
"""
|
||||
@@ -151,7 +171,9 @@ class NIDAQWriterService(BECService):
|
||||
# create a new file if it doesn't exist, otherwise append to it
|
||||
logger.info("Writing NI data to HDF5 file")
|
||||
start_time = time.time()
|
||||
with h5py.File("test_output_no_comp_2.h5", "a") as file:
|
||||
if not self.filename:
|
||||
return
|
||||
with h5py.File(self.filename, "a") as file:
|
||||
if self.reshape_dataset:
|
||||
for key in signals:
|
||||
# if the dataset already exists, append to it
|
||||
|
||||
@@ -29,8 +29,8 @@ class NIDAQSim(threading.Thread):
|
||||
BECMessage.DeviceMessage.loads(msg)
|
||||
total_time = time.time()
|
||||
while True:
|
||||
if index > 1000:
|
||||
break
|
||||
# if index > 1000:
|
||||
# break
|
||||
start = time.time()
|
||||
# signals = {
|
||||
# "signal1": np.asarray(range(index, index + 300000)),
|
||||
@@ -40,11 +40,11 @@ class NIDAQSim(threading.Thread):
|
||||
index += 1
|
||||
|
||||
if self.use_redis_stream:
|
||||
producer.xadd("ni_data:val", {"device_msg": msg}, max_size=100)
|
||||
producer.xadd("ni_data", {"device_msg": msg}, max_size=100)
|
||||
else:
|
||||
producer.lpush("ni_data", msg, max_size=10)
|
||||
|
||||
# time.sleep(0.005)
|
||||
time.sleep(0.5)
|
||||
print(f"Elapsed time: {time.time() - start}")
|
||||
print(f"Total time: {time.time() - total_time}")
|
||||
print(f"FPS: {index / (time.time() - total_time)}")
|
||||
|
||||
Reference in New Issue
Block a user