diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..74235c6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,177 @@ +**/*_venv +**/.idea +*.log +**/__pycache__ +**/.DS_Store +**/out +**/.vscode +**/.pytest_cache +**/*.egg* + +# file writer data +**.h5 + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/**/_build/ +docs/**/autodoc/ +docs/**/_autosummary/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +**.prof + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/services/NIDAQ_writer/NIDAQ_writer.py b/services/NIDAQ_writer/NIDAQ_writer.py new file mode 100644 index 0000000..ecad909 --- /dev/null +++ b/services/NIDAQ_writer/NIDAQ_writer.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +import queue +import threading +import time +from typing import List + +import h5py +import numpy as np +from bec_lib.core import ( + BECMessage, + MessageEndpoints, + RedisConnector, + ServiceConfig, + bec_logger, +) +from bec_lib.core.bec_service import BECService + +logger = bec_logger.logger + + +class NIDAQWriterService(BECService): + """ + A service that receives data from the NIDAQ through Redis and writes it continuously to a file. + """ + + reshape_dataset = True + use_redis_stream = False + + def __init__(self, config: ServiceConfig, connector_cls: RedisConnector) -> None: + super().__init__(config=config, connector_cls=connector_cls, unique_service=True) + self.queue = queue.Queue() + self._scan_status_consumer = None + self._ni_data_consumer = None + self._ni_data_event = None + self._ni_writer = None + self._ni_writer_event = None + + self.elapsed_time = 0 + self.start_time = 0 + self._start_scan_status_consumer() + self._start_ni_data_consumer() + self._start_ni_writer() + + def _start_scan_status_consumer(self) -> None: + """ + Start the scan consumer. + """ + self._scan_status_consumer = self.connector.consumer( + MessageEndpoints.scan_status(), cb=self._scan_status_callback, parent=self + ) + self._scan_status_consumer.start() + + @staticmethod + def _scan_status_callback(message: BECMessage, parent: NIDAQWriterService) -> None: + """ + Callback for scan status messages. + """ + msg = BECMessage.ScanStatusMessage.loads(message.value) + if not msg: + return + parent.handle_scan_status(msg) + + def _start_ni_data_consumer(self) -> None: + """ + Start the NI data consumer. + """ + + self._ni_data_event = threading.Event() + self._ni_data_consumer = threading.Thread(target=self._read_data, daemon=True) + self._ni_data_consumer.start() + + def _start_ni_writer(self) -> None: + """ + Start the NI data writer. + """ + self._ni_writer_event = threading.Event() + self._ni_writer = threading.Thread(target=self._write_data, daemon=True) + self._ni_writer.start() + + def _read_data(self): + """ + Read data from Redis. + """ + while not self._ni_data_event.is_set(): + start_time = time.time() + if self.use_redis_stream: + msg = self.producer.xread("ni_data") + + if msg: + num_msgs = len(msg[0][1]) + print(f"Received {num_msgs} messages in {time.time() - start_time} seconds") + msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]] + start_time = time.time() + self.handle_ni_data(msgs) + print(f"Handled {num_msgs} messages in {time.time() - start_time} seconds") + time.sleep(0.01) + else: + msgs = self.producer.r.lpop("ni_data:val", 20) + # time.sleep(0.001) + # if msgs: + # msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs] + # print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds") + # start_time = time.time() + # self.handle_ni_data(msgs) + # print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds") + + def handle_scan_status(self, msg: BECMessage.ScanStatusMessage) -> None: + """ + Handle scan status messages. + + Args: + msg: The scan status message. + """ + + def handle_ni_data(self, msgs: List[BECMessage.DeviceMessage]) -> None: + """ + Receive NI data messages and write them to the writer queue. + + Args: + msgs: The NI data messages. + """ + logger.info(f"Received {len(msgs)} NI data messages") + + # concatenate all messages + signals = {} + + for key in msgs[0].content["signals"]: + signals[key] = np.concatenate([msg.content["signals"][key] for msg in msgs]) + + # write data to queue + self.queue.put(signals) + + def _write_data(self) -> None: + """ + Get data from the writer queue and write it to disk. + """ + while not self._ni_writer_event.is_set(): + signals = self.queue.get() + logger.info(f"Remaining tasks: {self.queue.qsize()}") + self.write_data(signals) + self.queue.task_done() + + def write_data(self, signals: dict) -> None: + """ + Write data to disk. + + Args: + signals: The signals to write to disk. + """ + # create a new file if it doesn't exist, otherwise append to it + logger.info("Writing NI data to HDF5 file") + start_time = time.time() + with h5py.File("test_output_no_comp_2.h5", "a") as file: + if self.reshape_dataset: + for key in signals: + # if the dataset already exists, append to it + if key in file: + dataset = file[key] + dataset.resize(dataset.shape[0] + len(signals[key]), axis=0) + dataset[-len(signals[key]) :] = signals[key] + # otherwise create a new dataset + else: + file.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,)) + else: + # get all group names + group_names = list(file.keys()) + + # get max dataset number + dataset_num = [int(name.split("_")[1]) for name in group_names if "dataset" in name] + + if dataset_num: + dataset_num = max(dataset_num) + 1 + else: + dataset_num = 0 + group = file.create_group(f"dataset_{dataset_num}") + for key in signals: + group.create_dataset(key, data=signals[key], chunks=True, maxshape=(None,)) + logger.info(f"Finished writing NI data in {time.time() - start_time} seconds") diff --git a/services/NIDAQ_writer/__init__.py b/services/NIDAQ_writer/__init__.py new file mode 100644 index 0000000..86b902b --- /dev/null +++ b/services/NIDAQ_writer/__init__.py @@ -0,0 +1 @@ +from .NIDAQ_writer import NIDAQWriterService diff --git a/services/NIDAQ_writer/utils/nidaq_sim.py b/services/NIDAQ_writer/utils/nidaq_sim.py new file mode 100644 index 0000000..c1870ed --- /dev/null +++ b/services/NIDAQ_writer/utils/nidaq_sim.py @@ -0,0 +1,55 @@ +import threading +import time + +import numpy as np +from bec_lib.core import ( + BECMessage, + MessageEndpoints, + RedisConnector, + ServiceConfig, + bec_logger, +) + + +class NIDAQSim(threading.Thread): + use_redis_stream = False + + def run(self): + print("NIDAQSim running") + index = 0 + producer = RedisConnector(["localhost:6379"]).producer() + signal = np.asarray(range(index, index + 600000)) + signals = { + "signal1": signal, + "signal2": signal, + } + + msg = BECMessage.DeviceMessage(signals=signals) + msg = msg.dumps() + BECMessage.DeviceMessage.loads(msg) + total_time = time.time() + while True: + if index > 1000: + break + start = time.time() + # signals = { + # "signal1": np.asarray(range(index, index + 300000)), + # "signal2": np.asarray(range(index, index + 300000)), + # } + + index += 1 + + if self.use_redis_stream: + producer.xadd("ni_data:val", {"device_msg": msg}, max_size=100) + else: + producer.lpush("ni_data", msg, max_size=10) + + # time.sleep(0.005) + print(f"Elapsed time: {time.time() - start}") + print(f"Total time: {time.time() - total_time}") + print(f"FPS: {index / (time.time() - total_time)}") + print(f"Signal size: {signal.nbytes/1e6*2} MB") + + +if __name__ == "__main__": + NIDAQSim().start() diff --git a/services/launch_writer.py b/services/launch_writer.py new file mode 100644 index 0000000..4c8dc6a --- /dev/null +++ b/services/launch_writer.py @@ -0,0 +1,33 @@ +import argparse +import threading + +from bec_lib.core import RedisConnector, ServiceConfig, bec_logger +from NIDAQ_writer import NIDAQWriterService + +parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument( + "--config", + default="", + help="path to the config file", +) +clargs = parser.parse_args() +config_path = clargs.config + +config = ServiceConfig(config_path) +bec_logger.level = bec_logger.LOGLEVEL.INFO +logger = bec_logger.logger + +bec_server = NIDAQWriterService( + config=config, + connector_cls=RedisConnector, +) +try: + event = threading.Event() + # pylint: disable=E1102 + logger.success("Started NIDAQ writer service") + event.wait() +except KeyboardInterrupt as e: + # bec_server.connector.raise_error("KeyboardInterrupt") + bec_server.shutdown() + event.set() + raise e