pydase_service_base/icon_service_base/logging/LogBroadcastMessagePublisher.py
Mose Mueller a29878cbc1 updating logging module
- type hint
- comments
- unused imports
2023-04-11 08:05:12 +02:00

41 lines
1.4 KiB
Python

from __future__ import annotations
import loguru
from kombu import Connection, Exchange, Producer
from settings import amqp_settings
from .LogBroadcastMessage import LogBroadcastMessage, LogBroadcastMessageLevel
class LogBroadcastMessagePublisher:
def __init__(self, originator: str) -> None:
self._originator = originator
self._exchange = Exchange("logging", type="topic", durable=True)
self._connection = Connection(amqp_settings.broker_dsn)
self._channel = self._connection.channel()
bound_exchange = self._exchange(self._channel)
bound_exchange.declare()
self._producer = Producer(channel=self._channel, exchange=self._exchange)
def send_msg(self, loguru_msg: loguru.Message):
loguru_dict = loguru_msg.record
package = loguru_dict["name"] if loguru_dict["name"] is not None else ""
icon_msg = LogBroadcastMessage(
originator=self._originator,
level=LogBroadcastMessageLevel(loguru_dict["level"].name),
message=loguru_dict["message"],
package=package,
line_number=str(loguru_dict["line"]),
function=loguru_dict["function"],
timestamp=loguru_dict["time"],
)
self._producer.publish(
icon_msg.json(),
exchange=self._exchange,
expiration=amqp_settings.message_expiration_default_s,
routing_key=icon_msg.routing_key,
)