Files
debye_bec/bec_plugins/services/NIDAQ_writer/utils/nidaq_sim.py
2023-12-07 16:17:59 +01:00

50 lines
1.4 KiB
Python

import threading
import time
import numpy as np
from bec_lib import MessageEndpoints, RedisConnector, ServiceConfig, bec_logger, messages
class NIDAQSim(threading.Thread):
use_redis_stream = True
def run(self):
print("NIDAQSim running")
index = 0
producer = RedisConnector(["localhost:6379"]).producer()
signal = np.asarray(range(index, index + 600000))
signals = {
"signal1": signal,
"signal2": signal,
}
msg = messages.DeviceMessage(signals=signals)
msg = msg.dumps()
messages.DeviceMessage.loads(msg)
total_time = time.time()
while True:
# if index > 1000:
# break
start = time.time()
# signals = {
# "signal1": np.asarray(range(index, index + 300000)),
# "signal2": np.asarray(range(index, index + 300000)),
# }
index += 1
if self.use_redis_stream:
producer.xadd("ni_data", {"device_msg": msg}, max_size=100)
else:
producer.lpush("ni_data", msg, max_size=10)
time.sleep(0.5)
print(f"Elapsed time: {time.time() - start}")
print(f"Total time: {time.time() - total_time}")
print(f"FPS: {index / (time.time() - total_time)}")
print(f"Signal size: {signal.nbytes/1e6*2} MB")
if __name__ == "__main__":
NIDAQSim().start()