当前位置: 首页 > article >正文

【MyDB】4-VersionManager 之 4-VM的实现

【MyDB】4-VersionManager 之 4-VM的实现

  • VM 的实现
    • VM(VersionManager)的基本定义与实现优化
    • 具体功能实现
      • begin()开启事务
      • commit()提交事务
      • abort 中止事务
      • read 读取uid对应的数据记录所在的entry
      • insert方法,插入数据
      • delete方法

VM 的实现

本章代码位于:top/xianghua/mydb/server/vm/VersionManagerImpl.java

top/xianghua/mydb/server/vm/VersionManager.java

VM(VersionManager)的基本定义与实现优化

接下来,在介绍完了VM所依赖的记录版本,事务隔离以及死锁检测后,我们来到了最终VersionManager的实现。

VM 层通过 VersionManager 接口,向上层提供用于管理事务和数据操作的基本功能。包括事务的开始,提交,回滚以及数据的插入,读取和删除,VersionManager的基本定义如下:

public interface VersionManager {
    byte[] read(long xid, long uid) throws Exception;
    long insert(long xid, byte[] data) throws Exception;
    boolean delete(long xid, long uid) throws Exception;

    long begin(int level);
    void commit(long xid) throws Exception;
    void abort(long xid);
}

同时,VM 的实现类还被设计为 Entry 的缓存,需要继承 AbstractCache<Entry>。需要实现的获取到缓存和从缓存释放的方法很简单:

@Override
protected Entry getForCache(long uid) throws Exception {
    Entry entry = Entry.loadEntry(this, uid);
    if(entry == null) {
        throw Error.NullEntryException;
    }
    return entry;
}

@Override
protected void releaseForCache(Entry entry) {
    entry.remove();
}

具体功能实现

begin()开启事务

begin() 开启一个事务,并初始化事务的结构,将其存放在 activeTransaction 中,用于检查和快照使用:

    @Override
    /**
     * 根据传入的隔离级别开启一个事务
     * level:0表示读已提交,1表示可重复读
     */
    public long begin(int level) {
        lock.lock();
        try {
            long xid = tm.begin(); // 事务管理器开启一个事务,返回一个xid
            Transaction t = Transaction.newTransaction(xid, level, activeTransaction); // 创建一个事务对象
            activeTransaction.put(xid, t); // 将事务对象放入活跃事务集合中
            return xid; // 返回事务id
        } finally {
            lock.unlock();
        }
    }

commit()提交事务

commit() 方法提交一个事务,主要就是 free 掉相关的结构,并且释放持有的锁,并修改 TM 状态:

  1. 获取事务对象

  2. 将事务对象从活跃事务集合中移除

  3. 从LockTable中释放该事务占用的资源,并提交事务

    /**
     * 提交事务
     * @param xid
     * @throws Exception
     */
    @Override
    public void commit(long xid) throws Exception {
        // 1. 获取事务对象
        lock.lock();
        Transaction t = activeTransaction.get(xid); // 获取事务对象
        lock.unlock();

        try {
            if(t.err != null) {
                throw t.err;
            }
        } catch(NullPointerException n) {
            System.out.println(xid);
            System.out.println(activeTransaction.keySet());
            Panic.panic(n);
        }
        // 2. 将事务对象从活跃事务集合中移除
        lock.lock();
        activeTransaction.remove(xid); // 将事务对象从活跃事务集合中移除
        lock.unlock();

        // 3. 从LockTable中释放该事务占用的资源,并提交事务
        lt.remove(xid); // 将事务对象从LockTable中移除
        tm.commit(xid); // 事务管理器提交事务
    }

abort 中止事务

abort 事务的方法则有两种,手动和自动。手动指的是调用 abort() 方法,而自动,则是在事务被检测出出现死锁时,会自动撤销回滚事务;或者出现版本跳跃时,也会自动回滚:

  1. 从activeTransaction中根据xid获取事务对象
  2. 从LockTable中释放该事务占用的资源,并调用事务管理器终止事务
    @Override
    public void abort(long xid) {
        internAbort(xid, false);
    }

private void internAbort(long xid, boolean autoAborted) {
    lock.lock();
    Transaction t = activeTransaction.get(xid);
    if(!autoAborted) {
        activeTransaction.remove(xid);
    }
    lock.unlock();
    if(t.autoAborted) return;
    lt.remove(xid);
    tm.abort(xid);
}

read 读取uid对应的数据记录所在的entry

read() 方法读取一个 entry,注意判断下可见性即可:

    /**
     * 读取数据
     * @param xid
     * @param uid
     * @return
     * @throws Exception
     */
    @Override
    public byte[] read(long xid, long uid) throws Exception {
        // 1. 获取事务对象
        lock.lock();
        Transaction t = activeTransaction.get(xid);
        lock.unlock();

        if(t.err != null) {
            throw t.err;
        }
        // 2. 尝试从缓存中获取对应的entry
        Entry entry = null;
        try {
            entry = super.get(uid);
        } catch(Exception e) {
            if(e == Error.NullEntryException) {
                return null;
            } else {
                throw e;
            }
        }
        // 3. 判断事务是否可见
        try {
            if(Visibility.isVisible(tm, t, entry)) {
                return entry.data();
            } else {
                return null;
            }
        } finally {
            entry.release();
        }
    }

insert方法,插入数据

insert() 则是将数据包裹成 Entry,无脑交给 DM 插入即可:

@Override
public long insert(long xid, byte[] data) throws Exception {
    lock.lock();
    Transaction t = activeTransaction.get(xid);
    lock.unlock();
    if(t.err != null) {
        throw t.err;
    }
    byte[] raw = Entry.wrapEntryRaw(xid, data);
    return dm.insert(xid, raw);
}

delete方法

delete() 方法看起来略为复杂:

在这里插入图片描述

    @Override
    public boolean delete(long xid, long uid) throws Exception {
        // 1. 获取事务对象
        lock.lock();
        Transaction t = activeTransaction.get(xid);
        lock.unlock();

        // 2. 尝试从缓存中获取对应的entry
        if(t.err != null) {
            throw t.err;
        }
        Entry entry = null;
        try {
            entry = super.get(uid);
        } catch(Exception e) {
            if(e == Error.NullEntryException) {
                return false;
            } else {
                throw e;
            }
        }
        // 3. 判断事务是否可见
        try {
            // 3.1 如果事务不可见,则返回false
            if(!Visibility.isVisible(tm, t, entry)) {
                return false;
            }
            // 3.2 如果事务可见,则尝试获取锁
            Lock l = null;
            try {
                // 尝试获取指定 xid 和 uid 的锁
                l = lt.add(xid, uid);
            } catch(Exception e) {
                // 如果加锁失败,则抛出异常,并回滚事务
                t.err = Error.ConcurrentUpdateException;
                internAbort(xid, true);
                t.autoAborted = true;
                throw t.err;
            }
            // 3.3 如果成功获取到锁,锁定并立即解锁
            if(l != null) {
                l.lock();
                l.unlock();
            }

            // 3.4 如果 entry 的 Xmax 等于当前事务的 xid,说明该 entry 已经被当前事务删除,返回 false
            if(entry.getXmax() == xid) {
                return false;
            }

            // 3.5 检查是否发生版本跳过,如果发生版本跳过,则抛出异常
            if(Visibility.isVersionSkip(tm, t, entry)) {
                t.err = Error.ConcurrentUpdateException;
                internAbort(xid, true);
                t.autoAborted = true;
                throw t.err;
            }

            // 3.6 设置 entry 的 Xmax 为当前事务的 xid,并返回 true
            entry.setXmax(xid);
            return true;

        } finally {
            // 4. 释放锁
            entry.release();
        }
    }
     t.autoAborted = true;
                throw t.err;
            }

            // 3.6 设置 entry 的 Xmax 为当前事务的 xid,并返回 true
            entry.setXmax(xid);
            return true;

        } finally {
            // 4. 释放锁
            entry.release();
        }
    }
原文地址:https://blog.csdn.net/m0_52031708/article/details/145406326
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/526972.html

相关文章:

  • EtherCAT主站IGH-- 24 -- IGH之fsm_slave_config.h/c文件解析
  • CSS 值和单位详解:从基础到实战
  • NX/UG二次开发—CAM—快速查找程序参数名称
  • 使用Pygame制作“Flappy Bird”游戏
  • homebrew-usage
  • Intellij IDEA如何进入初始化页面?
  • AI智慧社区--用户登录JWT令牌校验
  • Vue.js组件开发-实现全屏焦点图片带图标导航按钮控制图片滑动切换
  • 服务器虚拟化实战:架构、技术与最佳实践
  • 使用Pygame制作“打砖块”游戏
  • 二.java开发项目 常用hutool aop mybatisplue依赖2.
  • 分布式架构中的事务管理:需要了解的常见解决方案
  • ai翻唱入门
  • priority_queue的创建_结构体类型(重载小于运算符)c++
  • 计算机网络之计算机网络协议、接口、服务等概念
  • 【MyDB】4-VersionManager 之 2-事务的隔离级别
  • pytorch实现半监督学习
  • CSS入门知识
  • VUE之组件通信(一)
  • win11本地部署 DeepSeek-R1 大模型!免费开源,媲美OpenAI-o1能力,断网也能用