From 31cc15f204ded7d368ef384cdb04448c18c5bc3f Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Thu, 23 Nov 2023 19:15:49 +0100 Subject: [PATCH] feat: added channel monitor as cli script --- bec_lib/bec_lib/__init__.py | 1 + bec_lib/bec_lib/channel_monitor.py | 30 +++++++++++++++++++ bec_lib/setup.py | 1 + bec_lib/util_scripts/channel_monitor.py | 38 ------------------------- 4 files changed, 32 insertions(+), 38 deletions(-) create mode 100644 bec_lib/bec_lib/channel_monitor.py delete mode 100644 bec_lib/util_scripts/channel_monitor.py diff --git a/bec_lib/bec_lib/__init__.py b/bec_lib/bec_lib/__init__.py index ef93c88d..a96c6c6e 100644 --- a/bec_lib/bec_lib/__init__.py +++ b/bec_lib/bec_lib/__init__.py @@ -1,5 +1,6 @@ from bec_lib.alarm_handler import Alarms from bec_lib.bec_service import BECService +from bec_lib.channel_monitor import channel_monitor_launch from bec_lib.client import BECClient from bec_lib.config_helper import ConfigHelper from bec_lib.connector import ProducerConnector diff --git a/bec_lib/bec_lib/channel_monitor.py b/bec_lib/bec_lib/channel_monitor.py new file mode 100644 index 00000000..7b726b78 --- /dev/null +++ b/bec_lib/bec_lib/channel_monitor.py @@ -0,0 +1,30 @@ +import argparse +import json +import threading + +from bec_lib import messages +from bec_lib.redis_connector import RedisConnector +from bec_lib.service_config import ServiceConfig + + +def channel_monitor_launch(): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--config", default="", help="path to the config file") + parser.add_argument("--channel", default="", help="channel name") + clargs = parser.parse_args() + config_path = clargs.config + topic = clargs.channel + + config = ServiceConfig(config_path) + + def channel_callback(msg, **kwargs): + msg = messages.MessageReader.loads(msg.value) + out = {"msg_type": msg.msg_type, "content": msg.content, "metadata": msg.metadata} + print(json.dumps(out, indent=4, default=lambda o: "")) + + connector = RedisConnector(config.redis) + consumer = connector.consumer(topics=topic, cb=channel_callback) + consumer.start() + + event = threading.Event() + event.wait() diff --git a/bec_lib/setup.py b/bec_lib/setup.py index cfd8e9ff..fc4ad67a 100644 --- a/bec_lib/setup.py +++ b/bec_lib/setup.py @@ -21,6 +21,7 @@ if __name__ == "__main__": extras_require={ "dev": ["pytest", "pytest-random-order", "coverage", "pandas", "black", "pylint"] }, + entry_points={"console_scripts": ["bec-channel-monitor = bec_lib:channel_monitor_launch"]}, package_data={"bec_lib.tests": ["*.yaml"], "bec_lib.configs": ["*.yaml", "*.json"]}, version=__version__, ) diff --git a/bec_lib/util_scripts/channel_monitor.py b/bec_lib/util_scripts/channel_monitor.py deleted file mode 100644 index d0665f1f..00000000 --- a/bec_lib/util_scripts/channel_monitor.py +++ /dev/null @@ -1,38 +0,0 @@ -import argparse -import json -import threading - -from bec_lib import messages -from bec_lib.redis_connector import RedisConnector -from bec_lib.service_config import ServiceConfig - -parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) -parser.add_argument( - "--config", - default="", - help="path to the config file", -) -parser.add_argument( - "--channel", - default="", - help="channel name", -) -clargs = parser.parse_args() -config_path = clargs.config -topic = clargs.channel - -config = ServiceConfig(config_path) - - -def channel_callback(msg, **kwargs): - msg = messages.MessageReader.loads(msg.value) - out = {"msg_type": msg.msg_type, "content": msg.content, "metadata": msg.metadata} - print(json.dumps(out, indent=4, default=lambda o: "")) - - -connector = RedisConnector(config.redis) -consumer = connector.consumer(topics=topic, cb=channel_callback) -consumer.start() - -event = threading.Event() -event.wait()