From bf11c4c1f021647828c9a84b2ffa3474a931768c Mon Sep 17 00:00:00 2001 From: Mose Mueller Date: Mon, 10 Apr 2023 17:12:04 +0200 Subject: [PATCH] Adding logging module --- .../logging/LogBroadcastMessage.py | 45 +++++++++++ .../logging/LogBroadcastMessageListener.py | 34 ++++++++ .../logging/LogBroadcastMessagePublisher.py | 40 ++++++++++ .../logging/LoguruInterceptHandler.py | 26 ++++++ icon_service_base/logging/initialize.py | 80 +++++++++++++++++++ 5 files changed, 225 insertions(+) create mode 100644 icon_service_base/logging/LogBroadcastMessage.py create mode 100644 icon_service_base/logging/LogBroadcastMessageListener.py create mode 100644 icon_service_base/logging/LogBroadcastMessagePublisher.py create mode 100644 icon_service_base/logging/LoguruInterceptHandler.py create mode 100644 icon_service_base/logging/initialize.py diff --git a/icon_service_base/logging/LogBroadcastMessage.py b/icon_service_base/logging/LogBroadcastMessage.py new file mode 100644 index 0000000..d583292 --- /dev/null +++ b/icon_service_base/logging/LogBroadcastMessage.py @@ -0,0 +1,45 @@ +from datetime import datetime +from enum import Enum + +from pydantic import BaseModel, Field + + +class LogBroadcastMessageLevel(Enum): + # low level messages that help tracking down issues + DEBUG = "DEBUG" + # generic messages + INFO = "INFO" + # something is suspicious, but program can continue normally + WARNING = "WARNING" + # something is wrong, correct program execution cannot be guaranteed + ERROR = "ERROR" + # program cannot continue to run any more + CRITICAL = "CRITICAL" + + +class LogBroadcastMessage(BaseModel): + originator: str = Field( + ..., description="The script or service where this message comes from." + ) + + level: LogBroadcastMessageLevel = Field( + ..., + description="Log level of this message. Possible values: DEBUG, INFO, WARNING, CRITICAL", + ) + message: str = Field(..., description="Actual content of the log message.") + package: str = Field( + ..., description="The python package where this message came from." + ) + line_number: str = Field( + ..., description="Line number where this message originated from." + ) + function: str = Field( + ..., description="The function that embeds the log message call." + ) + timestamp: datetime = Field(..., description="When the log message was created.") + + @property + def routing_key(self) -> str: + """Routing key based on contents of this message. + Constructed as: {self.__class__.__name__}.{self.level}.{self.package}""" + return f"{self.originator}.{self.level}.{self.package}.{self.function}" diff --git a/icon_service_base/logging/LogBroadcastMessageListener.py b/icon_service_base/logging/LogBroadcastMessageListener.py new file mode 100644 index 0000000..2b57a7f --- /dev/null +++ b/icon_service_base/logging/LogBroadcastMessageListener.py @@ -0,0 +1,34 @@ +from abc import abstractmethod +from uuid import uuid4 + +from kombu import Connection, Consumer, Exchange, Message, Queue +from loguru import logger + +from settings import amqp_settings + + +class LogBroadcastMessageListener: + def __init__(self): + queue_name = f"logging-listener-{uuid4()}" + + self._exchange = Exchange("logging", type="topic", durable=True) + self._connection = Connection(amqp_settings.broker_dsn) + self._channel = self._connection.channel() + + self._queue = Queue( + queue_name, + self._exchange, + routing_key="#", + durable=False, + auto_delete=True, + exclusive=True, + ) + self._consumer = Consumer(self._channel, [self._queue]) + self._consumer.register_callback(self.message_callback) # type: ignore + with self._consumer: # type: ignore + while True: + self._connection.drain_events() # type: ignore + + @abstractmethod + def message_callback(self, body: str, message: Message) -> None: + ... diff --git a/icon_service_base/logging/LogBroadcastMessagePublisher.py b/icon_service_base/logging/LogBroadcastMessagePublisher.py new file mode 100644 index 0000000..9178080 --- /dev/null +++ b/icon_service_base/logging/LogBroadcastMessagePublisher.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from kombu import Connection, Exchange, Producer # type: ignore +from loguru import Record, Message + +from settings import amqp_settings + +from .LogBroadcastMessage import LogBroadcastMessage + + +class LogBroadcastMessagePublisher: + def __init__(self, originator: str) -> None: + self._originator = originator + self._exchange = Exchange("logging", type="topic", durable=True) + + self._connection = Connection(amqp_settings.broker_dsn) + self._channel = self._connection.channel() + bound_exchange = self._exchange(self._channel) + bound_exchange.declare() + self._producer = Producer(channel=self._channel, exchange=self._exchange) + + def send_msg(self, loguru_msg: Message): + loguru_dict: Record = loguru_msg.record + + icon_msg = LogBroadcastMessage( + originator=self._originator, + level=loguru_dict["level"].name, + message=loguru_dict["message"], + package=loguru_dict["name"], + line_number=loguru_dict["line"], + function=loguru_dict["function"], + timestamp=loguru_dict["time"], + ) + + self._producer.publish( # type: ignore + icon_msg.json(), + exchange=self._exchange, + expiration=amqp_settings.message_expiration_default_s, + routing_key=icon_msg.routing_key, + ) diff --git a/icon_service_base/logging/LoguruInterceptHandler.py b/icon_service_base/logging/LoguruInterceptHandler.py new file mode 100644 index 0000000..37c1cd8 --- /dev/null +++ b/icon_service_base/logging/LoguruInterceptHandler.py @@ -0,0 +1,26 @@ +import logging +from typing import Any + +from loguru import logger + + +# from: https://github.com/Delgan/loguru section +# "Entirely compatible with standard logging" +class LoguruInterceptHandler(logging.Handler): + def emit(self, record: Any) -> None: + # Get corresponding Loguru level if it exists + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + + # Find caller from where originated the logged message + frame = logging.currentframe() + depth = 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + logger.opt(depth=depth, exception=record.exc_info).log( + level, record.getMessage() + ) diff --git a/icon_service_base/logging/initialize.py b/icon_service_base/logging/initialize.py new file mode 100644 index 0000000..941bf3f --- /dev/null +++ b/icon_service_base/logging/initialize.py @@ -0,0 +1,80 @@ +import logging +import sys + +from loguru import logger + +from .LogBroadcastMessagePublisher import LogBroadcastMessagePublisher +from .LoguruInterceptHandler import LoguruInterceptHandler + + +def initialize_service_logging( + loglevel: int, service_name: str, service_instance_name: str +) -> None: + logger.debug("Configuring service logging.") + originator = f"{service_name}.{service_instance_name}" + + _init_logger(originator, loglevel) + + +def initialize_script_logging( + loglevel: str, script_name: str, use_central_logging: bool = True +) -> None: + logger.debug("Configuring script logging.") + + originator = f"{script_name}" + allowed_levels = ["DEBUG", "INFO", "ERROR"] + + if loglevel in allowed_levels: + _init_logger(originator, logging.getLevelName(loglevel), use_central_logging) + else: + raise ValueError(f"Allowed log levels are: {allowed_levels}") + + +def _init_logger( + originator: str, loglevel: int, use_central_logging: bool = True +) -> None: + # default format + fmt = ( + "{time:DD.MM HH:mm:ss.SSS} {message} " + + "{name}:{function}:{line}@{process.name}/{thread.name}" + ) + + config = { + "handlers": [ + {"sink": sys.stderr, "format": fmt, "level": loglevel}, + ], + "extra": { + "originator": originator, + }, + } + logger.configure(**config) + + if use_central_logging: + exchange_logger = LogBroadcastMessagePublisher(originator) + + # DEBUG messages and above are sent to the central logging instance + # note that this could be a problem if a lot of debug messages are sent per second + logger.add(exchange_logger.send_msg, enqueue=True, level="DEBUG") # type: ignore + + # see https://loguru.readthedocs.io/en/stable/api/logger.html#color + logger.level("DEBUG", color="") + # celery default colors + # 'DEBUG': COLORS['blue'], + # 'WARNING': COLORS['yellow'], + # 'ERROR': COLORS['red'], + # 'CRITICAL': COLORS['magenta'], + + loguru_intercept_handler = LoguruInterceptHandler() + + logging.basicConfig(handlers=[loguru_intercept_handler], level=loglevel) + + # library default log levels + # -> get name of logger using pythons logger "logging" + # sqlalchemy spams statements to INFO level, therefore set to WARNING + logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING) + # datamodel_code_generator spams debug messages, therefore set to INFO + logging.getLogger("blib2to3.pgen2.driver").setLevel(logging.INFO) + + # add loguru handler + logging.getLogger("sqlalchemy.engine").handlers = [loguru_intercept_handler] + logging.getLogger("uvicorn").handlers = [loguru_intercept_handler]