feat: added daq writer and sim

This commit is contained in:
2023-07-13 17:28:05 +02:00
parent 1eef5016cb
commit 2b86d16056
5 changed files with 445 additions and 0 deletions
+177
View File
@@ -0,0 +1,177 @@
**/*_venv
**/.idea
*.log
**/__pycache__
**/.DS_Store
**/out
**/.vscode
**/.pytest_cache
**/*.egg*
# file writer data
**.h5
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/**/_build/
docs/**/autodoc/
docs/**/_autosummary/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
**.prof
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
+179
View File
@@ -0,0 +1,179 @@
from __future__ import annotations
import queue
import threading
import time
from typing import List
import h5py
import numpy as np
from bec_lib.core import (
BECMessage,
MessageEndpoints,
RedisConnector,
ServiceConfig,
bec_logger,
)
from bec_lib.core.bec_service import BECService
logger = bec_logger.logger
class NIDAQWriterService(BECService):
"""
A service that receives data from the NIDAQ through Redis and writes it continuously to a file.
"""
reshape_dataset = True
use_redis_stream = False
def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None:
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
self.queue = queue.Queue()
self._scan_status_consumer = None
self._ni_data_consumer = None
self._ni_data_event = None
self._ni_writer = None
self._ni_writer_event = None
self.elapsed_time = 0
self.start_time = 0
self._start_scan_status_consumer()
self._start_ni_data_consumer()
self._start_ni_writer()
def _start_scan_status_consumer(self) -> None:
"""
Start the scan consumer.
"""
self._scan_status_consumer = self.connector.consumer(
MessageEndpoints.scan_status(), cb=self._scan_status_callback, parent=self
)
self._scan_status_consumer.start()
@staticmethod
def _scan_status_callback(message: BECMessage, parent: NIDAQWriterService) -> None:
"""
Callback for scan status messages.
"""
msg = BECMessage.ScanStatusMessage.loads(message.value)
if not msg:
return
parent.handle_scan_status(msg)
def _start_ni_data_consumer(self) -> None:
"""
Start the NI data consumer.
"""
self._ni_data_event = threading.Event()
self._ni_data_consumer = threading.Thread(target=self._read_data, daemon=True)
self._ni_data_consumer.start()
def _start_ni_writer(self) -> None:
"""
Start the NI data writer.
"""
self._ni_writer_event = threading.Event()
self._ni_writer = threading.Thread(target=self._write_data, daemon=True)
self._ni_writer.start()
def _read_data(self):
"""
Read data from Redis.
"""
while not self._ni_data_event.is_set():
start_time = time.time()
if self.use_redis_stream:
msg = self.producer.xread("ni_data")
if msg:
num_msgs = len(msg[0][1])
print(f"Received {num_msgs} messages in {time.time() - start_time} seconds")
msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
start_time = time.time()
self.handle_ni_data(msgs)
print(f"Handled {num_msgs} messages in {time.time() - start_time} seconds")
time.sleep(0.01)
else:
msgs = self.producer.r.lpop("ni_data:val", 20)
# time.sleep(0.001)
# if msgs:
# msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
# print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
# start_time = time.time()
# self.handle_ni_data(msgs)
# print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
def handle_scan_status(self, msg: BECMessage.ScanStatusMessage) -> None:
"""
Handle scan status messages.
Args:
msg: The scan status message.
"""
def handle_ni_data(self, msgs: List[BECMessage.DeviceMessage]) -> None:
"""
Receive NI data messages and write them to the writer queue.
Args:
msgs: The NI data messages.
"""
logger.info(f"Received {len(msgs)} NI data messages")
# concatenate all messages
signals = {}
for key in msgs[0].content["signals"]:
signals[key] = np.concatenate([msg.content["signals"][key] for msg in msgs])
# write data to queue
self.queue.put(signals)
def _write_data(self) -> None:
"""
Get data from the writer queue and write it to disk.
"""
while not self._ni_writer_event.is_set():
signals = self.queue.get()
logger.info(f"Remaining tasks: {self.queue.qsize()}")
self.write_data(signals)
self.queue.task_done()
def write_data(self, signals: dict) -> None:
"""
Write data to disk.
Args:
signals: The signals to write to disk.
"""
# create a new file if it doesn't exist, otherwise append to it
logger.info("Writing NI data to HDF5 file")
start_time = time.time()
with h5py.File("test_output_no_comp_2.h5", "a") as file:
if self.reshape_dataset:
for key in signals:
# if the dataset already exists, append to it
if key in file:
dataset = file[key]
dataset.resize(dataset.shape[0] + len(signals[key]), axis=0)
dataset[-len(signals[key]) :] = signals[key]
# otherwise create a new dataset
else:
file.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
else:
# get all group names
group_names = list(file.keys())
# get max dataset number
dataset_num = [int(name.split("_")[1]) for name in group_names if "dataset" in name]
if dataset_num:
dataset_num = max(dataset_num) + 1
else:
dataset_num = 0
group = file.create_group(f"dataset_{dataset_num}")
for key in signals:
group.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,))
logger.info(f"Finished writing NI data in {time.time() - start_time} seconds")
+1
View File
@@ -0,0 +1 @@
from .NIDAQ_writer import NIDAQWriterService
+55
View File
@@ -0,0 +1,55 @@
import threading
import time
import numpy as np
from bec_lib.core import (
BECMessage,
MessageEndpoints,
RedisConnector,
ServiceConfig,
bec_logger,
)
class NIDAQSim(threading.Thread):
use_redis_stream = False
def run(self):
print("NIDAQSim running")
index = 0
producer = RedisConnector(["localhost:6379"]).producer()
signal = np.asarray(range(index, index + 600000))
signals = {
"signal1": signal,
"signal2": signal,
}
msg = BECMessage.DeviceMessage(signals=signals)
msg = msg.dumps()
BECMessage.DeviceMessage.loads(msg)
total_time = time.time()
while True:
if index > 1000:
break
start = time.time()
# signals = {
# "signal1": np.asarray(range(index, index + 300000)),
# "signal2": np.asarray(range(index, index + 300000)),
# }
index += 1
if self.use_redis_stream:
producer.xadd("ni_data:val", {"device_msg": msg}, max_size=100)
else:
producer.lpush("ni_data", msg, max_size=10)
# time.sleep(0.005)
print(f"Elapsed time: {time.time() - start}")
print(f"Total time: {time.time() - total_time}")
print(f"FPS: {index / (time.time() - total_time)}")
print(f"Signal size: {signal.nbytes/1e6*2} MB")
if __name__ == "__main__":
NIDAQSim().start()
+33
View File
@@ -0,0 +1,33 @@
import argparse
import threading
from bec_lib.core import RedisConnector, ServiceConfig, bec_logger
from NIDAQ_writer import NIDAQWriterService
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--config",
default="",
help="path to the config file",
)
clargs = parser.parse_args()
config_path = clargs.config
config = ServiceConfig(config_path)
bec_logger.level = bec_logger.LOGLEVEL.INFO
logger = bec_logger.logger
bec_server = NIDAQWriterService(
config=config,
connector_cls=RedisConnector,
)
try:
event = threading.Event()
# pylint: disable=E1102
logger.success("Started NIDAQ writer service")
event.wait()
except KeyboardInterrupt as e:
# bec_server.connector.raise_error("KeyboardInterrupt")
bec_server.shutdown()
event.set()
raise e