当前位置: 首页 > article >正文

分布式 ID 生成策略(一)

前言

在分库、分表之后,主键唯一ID的生成将不能基于数据库本身的自增方式生成,因为对于全局而言可能会重复,下面我们将构建一个外部服务统一提供 ID 的生成。

思路

我们采用一种基于数据库的分布式ID生成策略,每个节点维护一个本地的ID池,当节点的本地ID池耗尽时,再通过数据库去获取新的ID段,再放入 ID 池。

在这里插入图片描述

业务步长配置表

CREATE TABLE `sequence_step` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `barrier` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前序列值最大值',
  `step` int(11) NOT NULL DEFAULT '100' COMMENT '每次递增量',
  `sn` varchar(128) NOT NULL DEFAULT '' COMMENT '业务字段',
  `version` bigint(20) NOT NULL DEFAULT '1' COMMENT '乐观锁版本',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_sn` (`sn`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='全局ID生成表';
  • barrier:目前应用程序已经获取的最大ID值(即 ID 池的最大值);
  • step:每次获取时,递增的步长;
  • sn:业务字段名称,全局唯一。

可以通过表来配置每个业务字段的 ID 生成策略,比如 user_id,初始值是 1,步长是 100。

第一次的 ID 区间为【1,101),初始值更新为 101,则 ID 池子的数字就是【1,101)

系统优先去 ID 池子去取数据,当 ID 池子消耗完了后,再请求数据库,拿到新的初始值,并生成数据。

第二次的 ID 区间为【101,201),初始值更新为 201

可以看到,如果区间设置的太小,访问数据库就很频繁;区间设置的太大,万一系统崩溃了,断续的间隔就比较大了。

代码实现

整体流程如下

  1. 根据 sn 获取锁对象;
  2. 根据 sn 去本地内存获取 ID 段,如果不为空,则返回;否则,进入下一步;
  3. 根据 sn 去数据库获取 ID 段,并放入内存 ID 池。

ID 段

@NotThreadSafe
public class IDSequence implements Serializable {

    private String sn;//此sequence的唯一标识

    private int step;

    private AtomicLong current;//当前值

    private long barrier = -1;//栅栏,当前sequence所在step中的最大值。

    public IDSequence() {}

    public IDSequence(String sn) {
        this.sn = sn;
    }

    public IDSequence(String sn, long start, long barrier, int step) {
        this.sn = sn;
        this.current = new AtomicLong(start);
        this.barrier = barrier;
        this.step = step;
    }


    public String getSn() {
        return sn;
    }

    public void setSn(String sn) {
        this.sn = sn;
    }

    public long getCurrent() {
        if (current == null) {
            return -1;
        }
        return current.get();
    }

    public boolean valid() {
        return current != null;
    }

    /**
     * 此方法将会在并发环境中执行,需要注意控制数据安全性
     *
     * @return 0:表示超限,则需要重试
     * 负1:表示超限,需要重新fetch新的step,当前Sequence已不可用
     * 大于1:正常,此值可以使用。
     */
    public long increment() {
        if (current == null) {
            return -1;
        }
        long i = current.getAndIncrement();
        //边界
        if (i >= barrier) {
            current = null;
            return -1;//强制上层重置
        }
        return i;

    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("sn:").append(sn)
                .append(",step:").append(step)
                .append(",current:").append(current)
                .append(",barrier:").append(barrier);
        return sb.toString();
    }
}

ID 生成器

public class StepBasedIDGenerator{
    protected static final int DEFAULT_RETRY = 6;

    protected Map<String, IDSequence> cachedPool = new HashMap<>();

    private static Map<String,Object> locks = new ConcurrentHashMap<>(); //业务字段对应的锁

    private DataSource dataSource;

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     * 获取锁对象
     * @param sn
     * @return
     */
    private Object getLock(String sn) {
        return locks.computeIfAbsent(sn, k -> new Object());
    }


    private IDSequence getSequence(String sn) {
        IDSequence sequence = cachedPool.get(sn);
        //正常,返回
        if (sequence != null && sequence.valid()) {
            return sequence;
        }
        int i = 0;
        while (i < DEFAULT_RETRY) {
            sequence = fetch(sn);
            if (sequence != null) {
                cachedPool.put(sn, sequence);
                break;
            }
            i++;
        }
        return sequence;
    }

    /**
     * ID 生成器的入口
     */
    public long next(String sn) {

        Object lock = getLock(sn);

        synchronized (lock) {
            IDSequence sequence = getSequence(sn);
            if (sequence == null || !sequence.valid()) {
                throw new IllegalStateException("sn:" + sn + ",cant fetch sequence!");
            }

            try {
                long v = sequence.increment();
                if (v > 0) {
                    return v;
                }
                return next(sn);
            } catch (Exception e) {
                throw new RuntimeException("fetch nextStep error:", e);
            }
        }
    }


    private Connection getConnection() {
        // 数据库URL,用户名和密码
        String url = "";
        String username = "";
        String password = "";

        Connection conn = null;
        try {
            // 加载数据库驱动
            Class.forName("com.mysql.cj.jdbc.Driver");
            // 创建数据库连接
            conn = DriverManager.getConnection(url, username, password);
        } catch (ClassNotFoundException | SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }

    protected IDSequence fetch(String sn) {
        Connection connection = null;
        PreparedStatement ps = null;
        try {
            connection = getConnection();
            connection.setAutoCommit(true);//普通操作
            connection.setReadOnly(false);

            ps = connection.prepareStatement("select barrier,step,`version` from `sequence_step` where `sn` = ? limit 1");
            ps.setString(1, sn);
            ResultSet rs = ps.executeQuery();
            if (!rs.next()) {
                throw new IllegalStateException(sn + " is not existed,So fetch sequence cant be executed! Please check it!");
            }
            long _b = rs.getLong(1);//barrier
            int _s = rs.getInt(2);//step
            long _v = rs.getLong(3);//version

            ps.close();

            ps = connection.prepareStatement("update `sequence_step` set barrier = ?,update_time = ?,`version` = `version` + 1 where `sn` = ? and `version` = ?");
            long barrier = _b + _s;

            ps.setLong(1, barrier);
            ps.setDate(2, new Date(System.currentTimeMillis()));
            ps.setString(3, sn);
            ps.setLong(4, _v);
            int row = ps.executeUpdate();

            ps.close();
            if (row > 0) {
                //更新成功
                return new IDSequence(sn, _b, barrier, _s);
            }
            return null;
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (ps != null) {
                    ps.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ex) {
                //
            }
        }
    }
}

http://www.kler.cn/a/374830.html

相关文章:

  • 固定翼无人机飞行操控技术详解
  • C语言如何进行数据类型转换?
  • Python基础15_读取CSV文件绘图数据结构:栈链表
  • 单细胞数据分析(一):10X数据生成seurat数据对象
  • NLP segment-01-聊一聊分词 AI 的基础
  • 入门 | Kafka数据使用vector消费到Loki中使用grafana展示
  • 如何提高社媒品牌知名度,3个增加曝光的实操方法
  • 微信小程序服务通知
  • 【HarmonyOS NEXT】在 HarmonyOS NEXT 中实现优雅的加载动画
  • python 五子棋小游戏
  • GIN 反向代理功能
  • C/C++ 每日一练:二叉树的先序遍历
  • Webserver(2.6)信号
  • 信号完整性SI总结【小登培训】
  • OpenCV基础知识
  • DB-GPT系列(二):DB-GPT部署(镜像一键部署、源码部署)
  • C++ 代码工程化
  • 微信小程序,打开新的项目,调试遇见[ app.json 文件内容错误] app.json: 在项目根目录未找到 app.json
  • 【行业应用-工业防爆】本安型工业防爆网关,实现安全高效的数据传输与监控
  • 如何解决docker镜像下载失败问题
  • nfs作业
  • Docker | 容器数据卷在docker中的角色:持久化存储
  • WebSocket简单使用
  • Python实现图像(边缘)锐化:梯度锐化、Roberts 算子、Laplace算子、Sobel算子的详细方法
  • 【PythonWeb开发】Flask-RESTful字段格式化
  • C语言常见进制 (二进制、八进制、十进制、十六进制)详解