当前位置: 首页 > article >正文

监控MySQL数据表变化:Binlog的重要性及实践

引言

在数据库管理领域,跟踪和记录对数据库所做的更改是至关重要的。这些更改包括插入、更新或删除操作,它们可以影响应用程序的行为、数据一致性和安全性。MySQL 提供了多种机制来追踪这些变化,其中最强大且灵活的一种就是使用二进制日志(Binary Log),简称 binlog。

Binlog概述

从 MySQL 8.0 开始,默认情况下启用了二进制日志功能。而在早期版本如 MySQL 5.* 中,则需要手动开启。Binlog 是一种事务安全的日志文件,它记录了所有对数据库结构和内容进行修改的操作。通过这个日志,我们可以恢复到之前的某个状态,这对于灾难恢复、复制以及审计都是非常有用的工具。

监控数据表变化的意义

数据恢复与备份:当意外发生时,比如误删了一张重要表格,binlog 可以帮助我们回滚到事故发生前的状态。
主从复制:MySQL 的主从架构依赖于 binlog 来同步主服务器上的变更到从服务器。
审计和合规性:企业环境中,了解谁做了什么改动对于满足法规要求非常重要。
性能优化:分析频繁变更的数据可以帮助识别热点数据区域,从而优化索引或调整应用逻辑。
实时数据分析:一些系统利用 binlog 实现实时的数据流处理,例如将变更事件发送给消息队列进行进一步处理。

如何启用Binlog

对于 MySQL 5.* 版本,如果想要开启 binlog,你需要编辑配置文件 my.cnf 或者 my.ini,并在 [mysqld] 部分添加如下配置:

[mysqld]
log-bin=mysql-bin
server-id=1

然后重启 MySQL 服务使设置生效。

示例代码:读取Binlog

以下是一个简单的 java 代码片段,演示了如何使用 mysql-replication 库读取 MySQL 的 binlog 文件。
依赖

<dependency>
            <groupId>com.zendesk</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.30.1</version>
        </dependency>

代码示例


import cn.com.yeexun.core.exception.BizException;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.sql.*;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class BinlogListenerService {

    private static final String HOST = "真实ip";
    private static final int PORT = 3306;
    private static final String USERNAME = "root";
    private static final String PASSWORD = "密码";

    // 记录表 ID 和表名的映射关系
    private final Map<Long, String> tableIdToNameMap = new HashMap<>();
    // 监控数据库
    private static final String TARGET_DATABASE = "dn_name";
    // 监控目标表
    private static final String[] TARGET_TABLES = {"table_name1", "table_name2"};

    @PostConstruct
    public void startListening() {
        new Thread(() -> {
            try {
                BinaryLogClient client = new BinaryLogClient(HOST, PORT, USERNAME, PASSWORD);
                client.registerEventListener(this::handleEvent);
                client.connect();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void handleEvent(Event event) {
        EventData data = event.getData();
        if (data instanceof TableMapEventData) {
            // 捕获表信息
            TableMapEventData tableMapEventData = (TableMapEventData) data;
            String tableName = tableMapEventData.getTable();
            String databaseName = tableMapEventData.getDatabase();
            // 记录表 ID 和表名映射
            tableIdToNameMap.put(tableMapEventData.getTableId(), databaseName + "." + tableName);
        } else if (data instanceof WriteRowsEventData) {
            WriteRowsEventData eventData = (WriteRowsEventData) data;
            processEvent(eventData.getTableId(), "INSERT", eventData);
        } else if (data instanceof UpdateRowsEventData) {
            UpdateRowsEventData eventData = (UpdateRowsEventData) data;
            processEvent(eventData.getTableId(), "UPDATE", eventData);
        } else if (data instanceof DeleteRowsEventData) {
            DeleteRowsEventData eventData = (DeleteRowsEventData) data;
            processEvent(eventData.getTableId(), "DELETE", eventData);
        }
    }

    private void processEvent(long tableId, String eventType, Object eventData) {
        // 根据 tableId 获取表名
        String fullTableName = tableIdToNameMap.get(tableId);
        if (fullTableName == null) {
            return; // 未记录的表,忽略
        }
        String[] parts = fullTableName.split("\\.");
        String databaseName = parts[0];
        String tableName = parts[1];
        // 检查是否是目标库和表
        if (TARGET_DATABASE.equals(databaseName) && isTargetTable(tableName)) {
            if (eventType.contains("UPDATE")) {
                System.out.println("捕获到 " + eventType + " 事件 - 数据库: " + databaseName + ", 表: " + tableName);
                System.out.println("事件数据: " + eventData);
                String ss = eventData.toString();
                System.out.println("表名称" + tableName);
                Pattern rowsPattern = Pattern.compile("before=\\[(.*?)\\], after=\\[(.*?)\\]");
                Matcher rowsMatcher = rowsPattern.matcher(ss);
                while (rowsMatcher.find()) {
                    String beforeData = rowsMatcher.group(1);
                    String afterData = rowsMatcher.group(2);
                    System.out.println("变更前: " + beforeData);
                    System.out.println("变更后: " + afterData);
                    List<String> beforeList = Arrays.asList(beforeData.split(",\\s*"));
                    List<String> afterList = Arrays.asList(afterData.split(",\\s*"));
                    compareDifferences(beforeList, afterList, tableName);
                }
            } else {
//                // 新增和删除的数据
                String ss = eventData.toString();
                System.out.println(eventType.contains("INSERT") ? "新增" : "删除");
                Pattern rowsPattern = Pattern.compile("rows=\\[(.*?)\\]\\}", Pattern.DOTALL); // 捕获 rows 部分
                Matcher rowsMatcher = rowsPattern.matcher(ss);

                if (rowsMatcher.find()) {
                    String rowsContent = rowsMatcher.group(1).trim(); // 提取 rows 的内容
                    //System.out.println("数据行:\n" + rowsContent);

                    // 匹配每一行数据
                    Pattern rowPattern = Pattern.compile("\\[([^\\]]+)]"); // 捕获 [] 中的内容
                    Matcher rowMatcher = rowPattern.matcher(rowsContent);

                    while (rowMatcher.find()) {
                        String rowData = rowMatcher.group(1).trim(); // 提取每一行数据
                        System.out.println("数据行: " + rowData);

                    }
                } else {
                    System.out.println("未匹配到 rows 数据!");
                }

            }
        }
    }

    public static void compareDifferences(List<String> before, List<String> after, String tableName) {
        List<String> columnNamesFromDatabase = getColumnNamesFromDatabase(tableName);
        for (int i = 0; i < before.size(); i++) {
            String beforeValue = before.get(i);
            String afterValue = after.get(i);
            if (!Objects.equals(beforeValue, afterValue)) {

                System.out.printf("列名 %s: Before = %s, After = %s%n", columnNamesFromDatabase.get(i), beforeValue, afterValue);
            }
        }
    }

    private boolean isTargetTable(String tableName) {
        for (String targetTable : TARGET_TABLES) {
            if (targetTable.equals(tableName)) {
                return true;
            }
        }
        return false;
    }

    public static List<String> getColumnNamesFromDatabase(String tableName) {
        List<String> columnNames = new ArrayList<>();
        String url = "jdbc:mysql://" + HOST + ":" + PORT + "/" + TARGET_DATABASE;
        String username = USERNAME;
        String password = PASSWORD;

        try (Connection connection = DriverManager.getConnection(url, username, password)) {
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet columns = metaData.getColumns(null, null, tableName, null);

            while (columns.next()) {
                String columnName = columns.getString("COLUMN_NAME");
                columnNames.add(columnName);
            }
        } catch (SQLException e) {
            e.printStackTrace();
            throw new BizException("获取数据表元数据信息出错");
        }
        return columnNames;
    }

}


这段代码会持续监听 MySQL 的 binlog,并打印出所有的删除、更新和插入事件。请注意,这只是一个基础的例子,在生产环境中使用时,可能还需要考虑更多因素,如错误处理、多线程支持等。


http://www.kler.cn/a/449973.html

相关文章:

  • 自建MD5解密平台-续
  • mysql中局部变量_MySQL中变量的总结
  • 【YashanDB知识库】Oracle pipelined函数在YashanDB中的改写
  • 蓝桥杯练习生第四天
  • Blazor 直接读取并显示HTML 文件内容
  • VSCode如何修改默认扩展路径和用户文件夹目录到D盘
  • 关于mac—address
  • linux安装宝塔面板及git
  • 基于Spring Boot的个性化推荐外卖点餐系统
  • HarmonyOS(72)事件拦截处理详解
  • Certifying LLM Safety against Adversarial Prompting
  • 网络管理 详细讲解
  • 网络安全(一)主动攻击之DNS基础和ettercap实现DNS流量劫持
  • BOE(京东方)“向新2025”年终媒体智享会落地成都 持续创新引领产业步入高价值增长新纪元
  • MongoDB教程001:基本常用命令(数据库操作和集合操作)
  • 跨模态知识迁移:基于预训练语言模型的时序数据建模
  • WebRTC服务质量(08)- 重传机制(05) RTX机制
  • js按首字母分组和排序
  • 【华为OD-E卷-箱子之字形摆放 100分(python、java、c++、js、c)】
  • c# 后台任务自动执行