
一. 前言
Loguru is a library which aims to bring enjoyable logging in Python.
Did you ever feel lazy about configuring a logger and used
print()
instead?... I did, yet logging is fundamental to every application and eases the process of debugging. Using Loguru you have no excuse not to use logging from the start, this is as simple asfrom loguru import logger
.Also, this library is intended to make Python logging less painful by adding a bunch of useful functionalities that solve caveats of the standard loggers. Using logs in your application should be an automatism, Loguru tries to make it both pleasant and powerful.
pip install loguru
Delgan/loguru: Python logging made (stupidly) simple
loguru.logger - loguru documentation
简而言之, loguru
是个开箱即用近乎傻瓜化的强大日志管理工具, 相比于原生的logging - Logging facility for Python - Python 3.13.7 documentation, loguru不需要复杂的封装, 也不需要复杂的配置, 拿来就可以用.
但需要注意, 在某些场景下的使用, 如: 多进程.
二. 概览
核心API
: logger
(注意某些旧文档提及的内容, 新的代码将核心功能全部集中在logger
之下)
三. 详解
核心功能.
3.1 日志配置
logger.add()
参数名 | 类型 | 默认值 / 说明 | 关键说明 |
---|---|---|---|
sink | file-like/str/pathlib.Path/callable/ 协程函数 /logging.Handler |
无( 必填) | 接收日志的终端, 支持 5 种类型( 详见 " sink 类型" 部分) |
level | int/str |
"DEBUG"( 对应 severity 10) | 日志级别阈值, 仅高于 / 等于该级别的消息会被发送到 sink |
format | str/callable |
含时间, 级别, 模块 / 函数 / 行号, 消息的彩色模板( 见下文默认格式) | 日志格式化模板, 支持str.format() 语法, 可引用 record 字典的 key |
filter | callable/str/dict |
None | 过滤日志: 函数返回 True 则保留; 字符串匹配模块名; 字典指定模块级别阈值 |
colorize | bool | None( 自动判断 sink 是否为终端, 终端则显色) | 是否保留颜色标记( 转换为 ANSI 码或剥离) |
serialize | bool | False | 是否将日志和 record 转换为 JSON 字符串 |
backtrace |
bool | True | 是否显示完整异常栈( 超出捕获点的调用栈) |
diagnose | bool | True | 是否显示异常时的变量值( 生产环境建议设为 False, 避免泄露敏感数据) |
enqueue | bool | False | 是否通过多进程安全队列发送日志( 多进程写文件时推荐开启, 避免阻塞) |
catch | bool | True | 是否自动捕获 sink 处理日志时的错误( 避免程序崩溃) |
3.1.1 日志文件
当 sink 为文件路径时, 支持 3 个核心参数用于日志管理:
参数名 | 作用 | 可选配置类型及示例 |
---|---|---|
rotation | 触发日志轮转( 关闭当前文件, 创建新文件) | 1. int( 文件大小, 如 1024*1024 表示 1MB) ; 2. timedelta( 时间间隔, 如 timedelta (days=1)) ; 3. time( 固定时间, 如 datetime.time (18,0)) ; 4. str( 人性化配置, 如 "100 MB", "18:00", "sunday") |
retention | 清理旧日志文件( 轮转时或程序退出时执行) | 1. int( 保留文件数, 如 5) ; 2. timedelta( 最大保留时长, 如 timedelta (weeks=1)) ; 3. str( 如 "1 week", "2 months") ; 4. callable( 自定义清理逻辑) |
compression | 轮转后压缩旧日志文件 | 1. str( 压缩格式, 如 "gz", "zip", "tar.gz") ; 2. callable( 自定义压缩逻辑) |
功能类型 | 参数配置示例 | 效果描述 |
---|---|---|
日志轮转 | rotation="500 MB" |
当文件大小达到 500MB 时, 关闭当前文件并创建新文件 |
日志轮转 | rotation="12:00" |
每天中午 12 点自动创建新日志文件, 旧文件归档 |
日志轮转 | rotation="1 week" |
日志文件创建超过 1 周后, 触发轮转归档 |
日志保留 | retention="10 days" |
自动删除 10 天前的归档日志文件, 仅保留最近 10 天 |
日志压缩 | compression="zip" |
日志文件关闭后( 轮转或程序退出) 自动压缩为 ZIP |
时间命名 | sink="file_{time}.log" |
日志文件名包含创建时间( 如file_2024-09-09.log |
3.1.2 时间格式
支持自定义时间显示, 使用特定 token( 子集自 Pendulum 库) , 核心 token 如下:
Pendulum - Python datetimes made easy
类别 | Token | 输出示例 |
---|---|---|
年 | YYYY | 2024 |
月 | MM | 01-12 |
日 | DD | 01-31 |
时( 24 小时) | HH | 00-23 |
分 | mm | 00-59 |
秒 | ss | 00-59 |
毫秒 | SSS | 000-999 |
时区偏移 | Z | +08:00 |
UTC 转换 | !UTC | {time:HH:mm:ss!UTC}( UTC 时间) |
示例:
format="{time:YYYY-MM-DD HH:mm:ss Z} {message}"
输出带时区的时间.
3.1.3 颜色标记
通过标签为日志添加颜色, 标签自动适配 sink 类型( 非终端则剥离) , 核心标签如下:
类型 | 示例 | 说明 |
---|---|---|
基础颜色 | <red>文本</red> , <r>文本</> |
支持 red (r), green (g), blue (e) 等 8 种颜色 |
亮色 | <light-blue>文本</> |
前缀加 light-, 如 light-cyan (LC) |
样式 | <bold>文本</> , <u>文本</> |
支持加粗 (b), 下划线 (u), 斜体 (i) 等 |
级别自动色 | <level>文本</> |
颜色匹配日志级别( 如 ERROR 为红色) |
3.2 日志等级
Level name | Severity value | Logger method | 预期用途 |
---|---|---|---|
TRACE | 5 | logger.trace() | 追踪执行进度 |
DEBUG | 10 | logger.debug() | 调试 |
INFO | 20 | logger.info() | 一般信息记录 |
SUCCESS | 25 | logger.success() | 成功执行 |
WARNING | 30 | logger.warning() | 警告 |
ERROR | 40 | logger.error() | 错误 |
CRITICAL | 50 | logger.critical() | 严重错误 |
- TRACE: Loguru
默认情况下使用
DEBUG级别作为最低日志记录级别, 而不是
TRACE`级别. - DEBUG: 用于记录详细的调试信息, 通常只在开发过程中使用, 以帮助诊断问题.
- INFO: 用于记录常规信息, 比如程序的正常运行状态或一些关键的操作.
- SUCCESS: 通常用于记录操作成功的消息, 比如任务完成或数据成功保存.
- WARNING: 用于记录可能不是错误, 但需要注意或可能在未来导致问题的事件.
- ERROR: 用于记录错误, 这些错误可能会影响程序的某些功能, 但通常不会导致程序完全停止.
- CRITICAL: 用于记录非常严重的错误, 这些错误可能会导致程序完全停止或数据丢失.
Loguru 定义了从低到高的 7 种日志级别( 按 severity 值排序) :
TRACE(5) < DEBUG(10) < INFO(20) < SUCCESS(25) < WARNING(30) < ERROR(40) < CRITICAL(50)
level="DEBUG"
表示: 当前 sink 仅接收级别 ≥ DEBUG 的日志( 即 DEBUG
, INFO
, SUCCESS
, WARNING
, ERROR
, CRITICAL
这 6 种级别) , 而比 DEBUG
更低的 TRACE
级别日志会被自动过滤, 不被该 sink 处理.
即用于确定该如何写入日志信息.
import sys
from loguru import logger
# 移除默认sink, 避免重复输出
logger.remove()
# 1. 终端sink: 接收DEBUG及以上级别( 包括debug)
# 低级别的日志和高级别日志同时存在会导致多个内容输出, 只需要保留其中一个就好了, 保留低级别的日志, 而不想写入高级别的日志, 可以使用filter参数过滤掉
logger.add(
sys.stdout,
level="SUCCESS", # 关键: 设为DEBUG, 让终端显示debug日志
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>"
)
# 假如需要在终端和日志中同时出现信息, 需要设置两次
# 2. 文件sink: 接收DEBUG及以上级别( 已满足)
logger.add(
"app.log",
level="SUCCESS",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {module}.{function}:{line} | {message}",
rotation="1 MB"
)
logger.debug("debug")
logger.info("info") # 这些信息都不会出现
logger.success("success") # 只有这个信息出现
需要注意的是logger.trace
, 这个函数暂时还不清楚有什么比较特殊的用途, 和普通的debug
之类的有什么区别
from loguru import logger
import sys
logger.add(sys.stderr, level="TRACE") # 必须设置这个, 否则无法打印出信息来
logger.trace("这是一条跟踪消息")
logger.debug("这是一条调试信息")
3.3 其他的一些重要方法
方法名 | 作用 | 关键参数 / 特性 |
---|---|---|
opt() | 调整单次日志的生成规则 | exception=True ( 附加异常栈) , lazy=True ( 延迟执行昂贵格式化函数) , colors=True ( 启用消息内颜色标记) 等 |
bind() | 为日志 record 的extra 字典绑定全局属性 |
接收 **kwargs, 返回新 Logger 实例, 绑定的属性全局生效 |
contextualize() | 上下文局部绑定extra 属性( 基于 contextvars, 线程 / 异步任务隔离) |
接收 **kwargs, 作为上下文管理器 / 装饰器使用, 退出后恢复extra 初始状态 |
patch() | 绑定函数动态修改 record 字典 | 接收patcher 函数( 入参为 record, 需原地修改) , 多 patch 按添加顺序执行 |
level() | 添加 / 更新 / 检索日志级别 | 添加需name ( 名称) 和no ( severity 值) ; 更新仅能改color 和icon ; 检索仅传name |
disable()/enable() | 禁用 / 启用指定模块及其子模块的日志 | name 为模块名, disable("my_lib") 禁用 my_lib 的日志, enable("") 启用所有日志 |
configure() | 全局批量配置 Logger | 支持handlers ( 批量添加 sink) , levels ( 批量管理级别) , extra ( 全局默认 extra) 等 |
parse() | 解析日志文件为 dict | 需file ( 日志文件 / 文件对象) 和pattern ( 正则, 含命名分组) , cast 参数用于类型转换 |
3.4 简单使用示例
3.4.1 logger.catch
异常捕获装饰器, 支持装饰器和with
管理器模式实现.
@logger.catch
def test():
return 1 / 0
def test2():
return {'a': 1}['b']
test()
# 错误日志自动打印, 注意设置的打印日志的等级存在重复打印的问题
with logger.catch(message="test2"):
test2()
3.4.2 logger.remove
移除之前的日志输出, 否则, 会在之前的日志输出基础上继续输出.
remove(handler_id=None)[source]
Remove a previously added handler and stop sending logs to its sink.
- Parameters:
handler_id (
int
orNone
) – The id of the sink to remove, as it was returned by theadd()
method. IfNone
, all handlers are removed. The pre-configured handler is guaranteed to have the index0
.
- Raises:
ValueError – If
handler_id
is notNone
but there is no active handler with such id.
i = logger.add(sys.stderr, format="{message}")
logger.info("Logging")
logger.remove(i)
logger.info("No longer logging")
import sys
from loguru import logger
def main():
log_format = ("<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level:<8}</level> | "
"<cyan>{name}:{function}:{line} - {message}</cyan>")
# logger.remove() # 移除之前的输出
# 控制台标准输出
logger.add(sink=sys.stdout, format=log_format, level="DEBUG", colorize=True)
logger.debug("debug log")
if __name__ == '__main__':
main()
3.4.3 logger.complete()
等待日志被全部写入.
Wait for the end of enqueued messages and asynchronous tasks scheduled by handlers.
This method proceeds in two steps: first it waits for all logging messages added to handlers with
enqueue=True
to be processed, then it returns an object that can be awaited to finalize all logging tasks added to the event loop by coroutine sinks.It can be called from non-asynchronous code. This is especially recommended when the
logger
is utilized withmultiprocessing
to ensure messages put to the internal queue have been properly transmitted before leaving a child process.The returned object should be awaited before the end of a coroutine executed by
asyncio.run()
orloop.run_until_complete()
to ensure all asynchronous logging messages are processed. The functionasyncio.get_running_loop()
is called beforehand, only tasks scheduled in the same loop that the current one will be awaited by the method.
- Returns:
awaitable – An awaitable object which ensures all asynchronous logging calls are completed when awaited.
enqueue (bool, optional)
Whether the messages to be logged should first pass through a multiprocessing-safe queue before reaching the sink. This is useful while logging to a file through multiple processes. This also has the advantage of making logging calls non-blocking.
Enqueue slower than without - Issue #1118 - Delgan/loguru
3.4.4 异步
import asyncio
from loguru import logger
# 配置日志( 默认已支持异步, 可直接使用)
logger.remove()
logger.add(
"async.log",
level="INFO",
rotation="1 MB" # 按大小轮转
)
# 异步任务: 记录日志
async def async_task(task_id):
for i in range(3):
# 直接使用logger, 无需await
logger.info(f"异步任务 {task_id} 第 {i + 1} 次执行")
await asyncio.sleep(0.1) # 模拟异步操作
async def main():
# 创建多个异步任务
tasks = [async_task(i) for i in range(2)]
await asyncio.gather(*tasks)
# 若使用了协程sink, 需等待日志处理完成
await logger.complete()
if __name__ == "__main__":
asyncio.run(main())
3.4.5 多线程
这没什么需要特别注意的.
All sinks added to the
logger
are thread-safe by default. They are not multiprocess-safe, but you canenqueue
the messages to ensure logs integrity. This same argument can also be used if you want async logging.
import threading
import time
from loguru import logger
# 配置日志( 默认线程安全)
logger.remove()
logger.add(
"thread.log",
level="INFO",
format="{time} | 线程 {thread.name} | {message}" # 显示线程名
)
# 线程任务: 记录日志
def thread_task(task_id):
for i in range(3):
logger.info(f"任务 {task_id} 第 {i + 1} 次执行")
time.sleep(0.1) # 模拟耗时操作
if __name__ == "__main__":
# 创建并启动多个线程
threads = []
for i in range(2):
t = threading.Thread(
target=thread_task,
args=(i,),
name=f"Thread-{i}"
)
threads.append(t)
t.start()
# 等待所有线程结束
for t in threads:
t.join()
3.4.6 多进程
这部分见另一文章: 多进程, 内容相对复杂.
四. 其他
4.1 兼容logging
Loguru
可与 Python 原生logging
库无缝协作, 支持三种核心交互场景:
用原生 Handler 作为 sink: 将原生 logging 的 Handler( 如SysLogHandler
) 传入logger.add()
, 作为 Loguru 的 sink:
from logging.handlers import SysLogHandler
syslog_handler = SysLogHandler(address=('localhost', 514))
logger.add(syslog_handler) # Loguru日志输出到syslog
Loguru 日志传播到原生 logging: 自定义PropagateHandler
, 将 Loguru 的日志转发给原生 logging
import logging
class PropagateHandler(logging.Handler):
def emit(self, record):
logging.getLogger(record.name).handle(record)
logger.add(PropagateHandler(), format="{message}") # Loguru日志传播到原生
拦截原生 logging 日志: 自定义InterceptHandler
, 将原生 logging 的日志拦截到 Loguru 的 sink, 统一管理:
import inspect
class InterceptHandler(logging.Handler):
def emit(self, record):
# 匹配Loguru级别
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# 定位日志调用上下文
frame, depth = inspect.currentframe(), 0
while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
frame = frame.f_back
depth += 1
# 转发到Loguru
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
# 启用拦截
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
4.2 日志解析
Loguru 提供logger.parse()
方法, 结合正则表达式( 含命名分组) 解析日志文件, 支持对匹配结果进行类型转换( 如时间字符串转datetime
) :
import dateutil.parser
# 正则表达式: 含time, level, message三个命名分组
pattern = r"(?P<time>.*) - (?P<level>[0-9]+) - (?P<message>.*)"
# 类型转换: time转datetime, level转int
caster = dict(time=dateutil.parser.parse, level=int)
# 解析日志文件
for log_dict in logger.parse("app.log", pattern, cast=caster):
print(f"时间: {log_dict['time']}, 级别: {log_dict['level']}, 消息: {log_dict['message']}")
4.3 结构化日志
JSON 序列化: 通过serialize=True
将日志( 含record
字典的上下文信息) 转换为 JSON 字符串, 便于日志聚合工具( 如 Elasticsearch) 解析:
logger.add(lambda msg: print(msg), serialize=True) # 输出JSON格式日志
上下文绑定: 三种方式管理日志的extra
属性( 自定义上下文信息, 如用户 IP, 任务 ID) :
user_logger = logger.bind(ip="192.168.0.1", user="alice")
user_logger.info("User accessed homepage") # 日志含ip和user字段
bind()
: 创建带固定上下文的 logger, 全局生效
contextualize()
: 临时绑定上下文( 上下文管理器 / 装饰器) , 退出后恢复初始状态:
with logger.contextualize(task_id=123):
logger.info("Task started") # 日志含task_id=123
patch()
: 动态添加上下文( 如 UTC 时间) , 每次日志都会执行 patch 函数:
from datetime import datetime
logger = logger.patch(lambda record: record["extra"].update(utc=datetime.utcnow()))
五. 总结
Apprise allows you to send a notification to almost all of the most popular notification services available to us today such as: Telegram, Discord, Slack, Amazon SNS, Gotify, etc.
apprise - PyPI, 支持外部通信, 需要单独安装扩展包.
以下时loguru
的简单封装使用, 只需要理清其中的基本逻辑, 在使用上很是方便.
__all__ = ['mylogger']
import sys
from loguru import logger
from pathlib import Path
from typing import Any, Callable
import functools
import inspect
class LoggerProxy:
# 代理, 避免直接访问到logger某些设置
def __init__(self, logger_instance: logger):
self._instance = logger_instance
def __getattr__(self, name: str):
return (lambda *_args, **_kwargs: self._instance.warning(f"not allowed to call function: {name}")) if name in [
'add'] else getattr(self._instance, name)
class MyLogger:
_log_dir = Path(__file__).parents[2] / Path("logs") # 日志路径
_routine_file = _log_dir / "routine.log" # 常规日志
_error_file = _log_dir / "error.log" # .错误日志
_routine_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level> {level.icon} {level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<yellow>Process:{process.name}({process.id})</yellow> | "
"<blue>Thread:{thread.name}({thread.id})</blue> | "
"<level>{message}</level>"
) # 日志格式
def __init__(self):
Path(self._log_dir).mkdir(parents=True, exist_ok=True)
logger.remove()
# TRACE(5) < DEBUG(10) < INFO(20) < SUCCESS(25) < WARNING(30) < ERROR(40) < CRITICAL(50)
# 更高等级的, 不要再添加多的add操作, 否则会导致多次打印
logger.add(
sys.stdout,
level="DEBUG",
format=self._routine_format,
)
logger.add(
self._routine_file, # 输出内容
level="DEBUG", # 日志级别, 比debug高但是比error低的都会写入这里
format=self._routine_format, # 日志格式
rotation="10 MB", # 新创建日志文件的条件, 这里表示当时间过了零点, 就创建新的日志文件
retention="30 days", # 清理旧的日志的时间间隔
compression="gz", # 压缩格式, 当新的日志文件生成, 旧的文件会被压缩成压缩文件
enqueue=True, # 多进程支持
encoding="utf-8", # 编码格式
filter=lambda record: record["level"].name == "DEBUG", # 过滤掉其他的日志, 不写入
)
logger.add(
self._error_file,
level="ERROR", # 更高一级的 critical也会写入这里
format=self._routine_format,
rotation="10 MB",
retention="30 days",
enqueue=True,
encoding="utf-8"
)
level_configs = [
{"name": "TRACE", "color": "<fg #888>", "icon": "🔍"}, # 灰色 + 放大镜
{"name": "DEBUG", "color": "<blue>", "icon": "🐛"}, # 蓝色 + 虫子
{"name": "INFO", "color": "<green>", "icon": "ℹ️"}, # 绿色 + 信息图标
{"name": "SUCCESS", "color": "<cyan>", "icon": "✅"}, # 青色 + 对勾
{"name": "WARNING", "color": "<yellow>", "icon": "⚠️"}, # 黄色 + 警告图标
{"name": "ERROR", "color": "<red>", "icon": "❌"}, # 红色 + 错误图标
{"name": "CRITICAL", "color": "<bold><red>", "icon": "💥"} # 粗体红色 + 爆炸图标
]
for config in level_configs:
logger.level(
name=config["name"],
color=config["color"],
icon=config["icon"]
)
self._logger = logger
self._external_logger = LoggerProxy(self._logger)
@property
def instance(self) -> logger:
return self._external_logger
def log_decorator(self, start_msg='', success_mg='', normal_result=None, fail_result=None) -> Callable:
def decorator(func: Callable) -> Callable:
is_async = inspect.iscoroutinefunction(func)
def handle_success(result: Any) -> Any:
if success_mg:
self._logger.info(success_mg)
return result if normal_result is None else normal_result
def handle_error(e: Exception, args: Any, kwargs: Any) -> Any:
prefix = "async function" if is_async else "function"
self._logger.error(
f"{prefix}: {func.__name__}, error: {str(e)}; "
f"args:{str(args)}; kwargs:{str(kwargs)}"
)
return fail_result
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return handle_success(func(*args, **kwargs))
except Exception as e:
return handle_error(e, args, kwargs)
@functools.wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return handle_success(await func(*args, **kwargs))
except Exception as e:
return handle_error(e, args, kwargs)
if start_msg:
self._logger.info(start_msg)
return async_wrapper if is_async else wrapper
return decorator
mylogger = MyLogger()
if __name__ == '__main__':
mylogger.instance.info('info')
mylogger.instance.debug('debug')
mylogger.instance.warning('warning')
with mylogger.instance.catch(message='with'):
print({"a": 1}['b'])
import asyncio
@mylogger.log_decorator('sync')
def test(a: int, b: int) -> float:
return a / b
@mylogger.log_decorator('async')
async def async_test():
await asyncio.sleep(0.5)
test(1, 0)
asyncio.run(async_test())