当前位置: 首页 > article >正文

Flask实现高效日志记录模块

目录

一. 简介:

1. 为什么需要请求日志

二.  日志模块组成

1.  对应日志表创建(包含日志记录的关键字段)

2.  编写日志记录静态方法

3.  在Flask中捕获请求日志

4.  捕获异常并记录错误日志

5.  编写日志接口数据展示

6.  写入数据展示

三. 日志信息格式处理问题

1. 如何处理流式响应(Passthrough)

2. 如何记录响应数据(如JSON响应)

3. 总结与优化建议

四 . 结尾


一. 简介:

在Flask应用中,日志记录是重要的功能之一,它可以帮助开发人员跟踪请求的处理情况,快速定位错误,并且有助于应用的监控与调试。本文将介绍如何在Flask应用中实现请求日志记录,包括如何记录请求的各种信息(如请求数据、响应数据、错误信息等)并将其保存到数据库中。我们还会演示如何捕获不同级别的日志(信息级别、错误级别),并讨论如何处理复杂的响应数据(如流式响应)。


1. 为什么需要请求日志

日志记录可以帮助开发人员和运维团队了解应用的行为和状态,尤其是在生产环境中。通过记录每次请求的详细信息,开发人员能够:

  • 跟踪应用性能。
  • 快速定位和调试错误。
  • 为监控和安全审计提供数据支持。

例如,在出现错误时,记录详细的堆栈信息、请求的URL、请求参数和响应数据,能够帮助你迅速分析问题并进行修复。

二.  日志模块组成

  1.  对应日志表创建(包含日志记录的关键字段

# 日志表
class Log(db.Model,TimestampMixin):
    """日志表"""
    __tablename__ = 't_logs'
    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'comment': '日志表'
    }

    id = db.Column(db.Integer, primary_key=True, autoincrement=True,comment='id')
    user_id = db.Column(db.Integer,comment='用户ID')  # 用户ID
    ip_address = db.Column(db.String(50),comment='ip地址')  # ip地址
    level = db.Column(db.String(50),comment='日志级别')  # 日志级别
    message = db.Column(db.Text,comment='日志内容')  # 日志内容
    module = db.Column(db.String(100),comment='模块名称')  # 模块名称
    method = db.Column(db.String(50),comment='方法名称')  # 方法名称
    url = db.Column(db.String(255),comment='请求的URL')  # 请求的URL
    request_data = db.Column(db.Text,comment='请求数据')  # 请求数据
    response_data = db.Column(db.Text,comment='响应数据')  # 响应数据
    error_code = db.Column(db.String(50),comment='错误代码')  # 错误代码
    stack_trace = db.Column(db.Text,comment='堆栈追踪')  # 堆栈追踪
    hostname = db.Column(db.String(100),comment='服务器主机名')  # 服务器主机名
    context = db.Column(db.String(255),comment='上下文信息')  # 上下文信息

    def __repr__(self):
        return f"<ErrorLog(id={self.id}, ip_address={self.ip_address}, level={self.level}, message={self.message})>"

2.  编写日志记录静态方法

    # db.session 进行数据提交等操作
    @staticmethod
    def log_message(session, exception=None, user_id=None,error_code=None,level=None, message=None, **kwargs):
        try:
            # 获取请求数据
            request_data=''
            if request.method == 'GET':
                request_data = str(dict(request.args))  # 直接获取查询参数
            elif request.method == 'POST':
                if request.is_json:
                    request_data = str(request.get_json())  # 获取 JSON 数据
                else:
                    request_data = str(request.form)  # 获取表单数据

            # 获取请求的其他信息
            ip_address = request.remote_addr  # 获取客户端IP地址
            url = request.url  # 获取请求的URL
            response_data = str(kwargs.get('response_data', ''))  # 获取响应数据
            stack_trace = traceback.format_exc() if exception else ""  # 获取堆栈追踪
            hostname = socket.gethostname()  # 获取服务器主机名
            context = kwargs.get('context', 'Production')  # 上下文(默认生产环境)

            if not message:
                message = str(exception) if exception else "Unknown error"

            # 检查错误响应数据并跳过日志记录
            try:
                # 将响应数据从字符串转换为字典
                response_dict = json.loads(response_data)
                if isinstance(response_dict, dict) and response_dict.get("message") == "内部服务器错误":
                    return  # 跳过日志记录
            except json.JSONDecodeError:
                # 如果无法解析 JSON,则跳过判断
                pass
            # print(user_id)
            # 创建并保存日志条目
            log = Log(
                user_id=user_id if user_id else 0,
                level=level,
                message=message,
                ip_address=ip_address,
                url=url,
                request_data=request_data,
                response_data=response_data,
                stack_trace=stack_trace,
                hostname=hostname,
                context=context,
                method=request.method,
                module=request.blueprint,
                error_code=error_code,
            )

            # 保存日志条目
            db.session.add(log)
            db.session.commit()

        except Exception as e:
            print(e)
            pass

3. 在Flask中捕获请求日志

在Flask中,我们可以通过使用 after_request 钩子来捕获请求信息。这个钩子在每次请求处理完毕后执行,适合用于记录日志。示例如下:

# 请求成功日志记录
@app.after_request
def after_request(response):
    # 获取当前用户信息
    user_id = None
    level = "INFO"  # 你可以根据需要动态设置日志级别,如根据响应状态码判断
    message = "请求成功!"  # 请求成功的默认信息
    # 记录日志
    try:
        current_user = get_jwt_identity()
        # 获取用户信息(从数据库中获取用户信息做对比)
        user_info = db.session.query(User).filter(User.username == current_user).first()
        user_id = user_info.id
    except RuntimeError as e:
        # 捕获没有 JWT 时抛出的 RuntimeError 异常
        # 不做任何事情,直接跳过日志记录
        pass
    except Exception as e:
        print(e)
        pass

    # 如果响应处于 passthrough 模式,则不能直接访问 response.data
    if not response.direct_passthrough:
        # 正常情况下获取响应数据并转化为文本
        response_data = response.get_data(as_text=True)
    else:
        # 如果是 passthrough 模式,说明是流式响应或直接传递模式
        response_data = "{}"  # 或者根据需求设置适当的默认值

    if user_id:
        Log.log_message(
            db.session,
            user_id=user_id if user_id else 0,
            level=level,
            message=message,
            response_data=response_data,
            context="Production",  # 可选的环境信息
            error_code=200,
        )
    return response

在这个例子中,我们通过 after_request 钩子来处理每个请求后执行的日志记录。获取响应数据时,根据响应的类型决定是否直接获取 response.data

4. 捕获异常并记录错误日志

在实际开发中,应用程序往往会遇到异常。为了保证日志的完整性,我们可以捕获异常并将其记录下来。特别是对于HTTP 500类错误,应该记录详细的堆栈信息。

# 异常处理日志记录
@app.errorhandler(Exception)
def handle_exception(e):
    # 捕获所有异常,记录日志
    level = "ERROR"
    message = str(e)  # 将异常转换为字符串
    stack_trace = traceback.format_exc()  # 获取堆栈追踪

    # 获取当前用户信息
    user_id = None
    try:
        current_user = get_jwt_identity()
        user_info = db.session.query(User).filter(User.username == current_user).first()
        user_id = user_info.id
    except RuntimeError:
        # 如果没有 JWT 则不记录用户ID
        pass
    if user_id:
        # 记录异常日志
        Log.log_message(
            db.session,
            exception=e,
            user_id=user_id,
            level=level,
            message='请求失败!',
            stack_trace=stack_trace,
            context="Production",
            error_code=500,
        )
    print(message)
    # 返回通用的500错误响应
    return {"message": "内部服务器错误"}, 500

在这个例子中,我们捕获了通用的异常,并将错误信息、堆栈追踪以及其他日志信息保存到数据库。

5. 编写日志接口数据展示

from flask import Response, jsonify, Flask, request, Blueprint,url_for
from configs import *
from modules.Tables import *
from sqlalchemy import func

# 创建蓝图,对应的register目录(激活操作视图)
log_view = Blueprint('log_view', __name__)

# 日志信息展示
@log_view.route('/log_data', methods=['GET'])
@jwt_required()
def log_data():

    # 获取分页参数
    page = request.args.get('page', default=1, type=int)  # 当前页码
    per_page = request.args.get('per_page', default=15, type=int)  # 每页显示条目数量
    level = request.args.get('level')  # 日志等级

    # 定义筛选列表
    filters = []
    # 当筛选条件存在时添加到列表
    if level:
        filters.append(Log.level == level)

    # 构建查询条件
    query = and_(*filters) if filters else True  # 如果 filters 为空,默认条件为 True


    # 查询主区域点并进行分页
    pagination = db.session.query(Log.id,
                         Log.user_id,Log.ip_address, Log.level,Log.message,Log.module,Log.method,Log.url,
                         Log.request_data, Log.response_data,Log.error_code, Log.stack_trace,Log.hostname,Log.context,Log.created_at
                 ).filter(query).paginate(page=page, per_page=per_page, error_out=False)

    # 获取分页后的数据
    data_list = [
        {
            'id': item.id,
            'user_id': item.user_id,
            'ip_address': item.ip_address,
            'level': item.level,
            'message': item.message,
            'module': item.module,
            'method': item.method,
            'url': item.url,
            'request_data': item.request_data if item.response_data else item.stack_trace,
            'response_data': item.response_data,
            'error_code': item.error_code,
            # 'stack_trace':item.stack_trace,
            'hostname':item.hostname,
            'context':item.context,
            'created_at': format_datetime(item.created_at),

        }
        for item in pagination.items
    ]


    # 构造返回结果,包括分页信息
    response = {
        'code': 200,
        'data': data_list,
        'pagination': {
            'current_page': pagination.page,
            'total_pages': pagination.pages,
            'total_items': pagination.total,
            'per_page': pagination.per_page
        }
    }

    return jsonify({'code':200,'data':response})


 6. 写入数据展示

三. 日志信息格式处理问题

1. 如何处理流式响应(Passthrough)

Flask中的流式响应(passthrough)不允许直接访问 response.data,因此需要特别处理。通过 response.get_data(as_text=True) 方法,我们可以安全地获取响应数据并将其存储到日志中。如果响应不可序列化(如流式数据),可以跳过或记录默认值。

if not response.direct_passthrough:
    response_data = response.get_data(as_text=True)
else:
    response_data = "Non-serializable response"

2. 如何记录响应数据(如JSON响应)

对于返回JSON数据的响应,我们可以通过 response.get_data(as_text=True) 获取响应体的内容。这种方式适用于大多数JSON响应,确保我们可以将响应内容记录到日志中。

例如,在捕获日志时,我们可以如下处理响应数据:

response_data = response.get_data(as_text=True)

对于非JSON响应,使用 str() 也能保证将其转化为可记录的格式。

3. 总结与优化建议

  • 在Flask应用中,通过 @app.after_request 钩子可以轻松地记录每次请求的日志。
  • 根据不同的请求方法(GET/POST)和响应类型(JSON、流式数据等),动态调整日志记录方式。
  • 使用 try-except 语句捕获异常,并记录详细的错误信息以帮助后期的调试和维护。

通过这些方法,我们可以高效、全面地记录Flask应用中的日志,并为后期的性能优化、故障排查提供支持。

四 . 结尾

日志记录是开发和运维过程中不可或缺的一部分,它能帮助我们及时发现问题并做出调整。通过本文介绍的日志记录方式,我们能够方便地捕获请求和响应的详细信息,同时处理各种异常和特殊情况。希望这篇文章能够帮助你更好地理解并实现Flask中的日志记录。


http://www.kler.cn/a/553957.html

相关文章:

  • 前端基础入门:HTML、CSS 和 JavaScript
  • 简识MyBatis、MyBatis-plus、和Spring Data JPA的区别
  • Docker 部署AnythingLLM
  • very强烈的小病毒
  • 驱动开发系列37 - Linux Graphics 2D 绘制流程(二)- 画布创建和窗口关联
  • 大语言模型Agent
  • macos安装jmeter测试软件
  • Navicat16安装教程(附安装包)2025最新版详细图文安装教程
  • 线性模型 - Logistic回归(参数学习具体示例)
  • Ollama 本地GUI客户端:为DeepSeek用户量身定制的智能模型管理与交互工具
  • leetcode-16. 最接近的三数之和
  • Django ModelForm使用(初学)
  • 全面掌握Python时间处理
  • DeepSeek 云原生分布式部署的深度实践与疑难解析—— 从零到生产级落地的全链路避坑指南
  • 跳表(Skip List)详解
  • 基于YOLOv8的人脸识别系统
  • AI驱动的精准教育:个性化学习新时代
  • 提升接口性能之异步
  • 在ubuntu上用Python的openpyxl模块操作Excel的案例
  • 深度学习之自然语言处理CBOW预测及模型的保存