feat: added channel monitor as cli script

This commit is contained in:
wakonig_k 2023-11-23 19:15:49 +01:00
parent 8dbf4c79d4
commit 31cc15f204
4 changed files with 32 additions and 38 deletions

View File

@ -1,5 +1,6 @@
from bec_lib.alarm_handler import Alarms
from bec_lib.bec_service import BECService
from bec_lib.channel_monitor import channel_monitor_launch
from bec_lib.client import BECClient
from bec_lib.config_helper import ConfigHelper
from bec_lib.connector import ProducerConnector

View File

@ -0,0 +1,30 @@
import argparse
import json
import threading
from bec_lib import messages
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
def channel_monitor_launch():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", default="", help="path to the config file")
parser.add_argument("--channel", default="", help="channel name")
clargs = parser.parse_args()
config_path = clargs.config
topic = clargs.channel
config = ServiceConfig(config_path)
def channel_callback(msg, **kwargs):
msg = messages.MessageReader.loads(msg.value)
out = {"msg_type": msg.msg_type, "content": msg.content, "metadata": msg.metadata}
print(json.dumps(out, indent=4, default=lambda o: "<not serializable object>"))
connector = RedisConnector(config.redis)
consumer = connector.consumer(topics=topic, cb=channel_callback)
consumer.start()
event = threading.Event()
event.wait()

View File

@ -21,6 +21,7 @@ if __name__ == "__main__":
extras_require={
"dev": ["pytest", "pytest-random-order", "coverage", "pandas", "black", "pylint"]
},
entry_points={"console_scripts": ["bec-channel-monitor = bec_lib:channel_monitor_launch"]},
package_data={"bec_lib.tests": ["*.yaml"], "bec_lib.configs": ["*.yaml", "*.json"]},
version=__version__,
)

View File

@ -1,38 +0,0 @@
import argparse
import json
import threading
from bec_lib import messages
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--config",
default="",
help="path to the config file",
)
parser.add_argument(
"--channel",
default="",
help="channel name",
)
clargs = parser.parse_args()
config_path = clargs.config
topic = clargs.channel
config = ServiceConfig(config_path)
def channel_callback(msg, **kwargs):
msg = messages.MessageReader.loads(msg.value)
out = {"msg_type": msg.msg_type, "content": msg.content, "metadata": msg.metadata}
print(json.dumps(out, indent=4, default=lambda o: "<not serializable object>"))
connector = RedisConnector(config.redis)
consumer = connector.consumer(topics=topic, cb=channel_callback)
consumer.start()
event = threading.Event()
event.wait()