当前位置: 首页 > article >正文

Redis原理:multiexec命令

        在Redis中,MULTIEXEC命令提供了一种机制来确保一系列命令以原子性的方式被执行。这意味着一旦这些命令被组合在一起并发送到服务器端,它们将作为一个整体被处理:要么全部成功执行,要么完全不执行。这一特性对于需要保证数据一致性的应用场景非常有用。
具体来说,当你向Redis客户端发送MULTI命令时,标志着开始一个事务块。从这时起,直到发出EXEC为止,所有发往该Redis连接的命令都不会立即执行,而是被暂时缓存起来。只有当EXEC命令被调用后,之前缓存的所有命令才会按照接收顺序一次性地在服务器端执行,并且整个过程是不可中断的。如果在这批命令执行过程中出现了错误(例如某个命令语法有误),那么除了导致错误的那个命令之外,其他命令仍然会被正常执行。
示例如下:

jedis.watch(key); // 在下一节中解析
// 开启批量执行
jedis.multi();
try{
    jedis.set(key,value);
    jedis.expire(expire,ttl);
    // 服务端开始执行
    jedis.exec();
catch(Exeception e) {
    // 放弃
    jedis.discard();
}

源码分析

multi命令

当我们调用multi命令时,会来到mutilCommand方法(multi.c)。

void multiCommand(client *c) {
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    // 给当前的客户端添加一个CLIENT_MULTI的标识
    c->flags |= CLIENT_MULTI;

    addReply(c,shared.ok);
}

该命令只是给当前客户端对象添加一个CLIENT_MULTI就直接返回了。

其他命令

而后,当我们调用set 和 expire 命令时,会经过以下的判断(server.c中方法processCommand):

// 如果连接的客户端是已经调用了multi命令
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
    c->cmd->proc != execCommand &&
    c->cmd->proc != discardCommand &&
    c->cmd->proc != multiCommand &&
    c->cmd->proc != watchCommand &&
    c->cmd->proc != quitCommand &&
    c->cmd->proc != resetCommand)
{
    // 将命令添加到队列
    queueMultiCommand(c);
    addReply(c, shared.queued);
}

会判断当前客户端是否有CLIENT_MULTI,并且不是相关的命令,则会调用queueMultiCommand方法将命令加入队列。

void queueMultiCommand(client *c) {
    multiCmd *mc;

    /* No sense to waste memory if the transaction is already aborted.
     * this is useful in case client sends these in a pipeline, or doesn't
     * bother to read previous responses and didn't notice the multi was already
     * aborted. */
    // 如果标识中有CLIENT_DIRTY_CAS,则不处理,说明watch的key已经被修改
    // 在下一节中说明
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
        return;
    // 申请命令缓存
    // 进行扩容
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 获取当前的最后一个对象
    // 进行缓存redis命令&参数
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = c->argv;
    mc->argv_len = c->argv_len;

    c->mstate.count++;
    c->mstate.cmd_flags |= c->cmd->flags;
    c->mstate.cmd_inv_flags |= ~c->cmd->flags;
    c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;

    /* Reset the client's args since we copied them into the mstate and shouldn't
     * reference them from c anymore. */
    c->argv = NULL;
    c->argc = 0;
    c->argv_len_sum = 0;
    c->argv_len = 0;
}

exec 命令

void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc, orig_argv_len;
    struct redisCommand *orig_cmd;
    // 如果没有搭配multi一起使用,报错
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    // .. 省略部分代码
    // 标识正在exec命令中
    server.in_exec = 1;

    orig_argv = c->argv;
    orig_argv_len = c->argv_len;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyArrayLen(c,c->mstate.count);
    // 执行所有的命令
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->argv_len = c->mstate.commands[j].argv_len;
        c->cmd = c->realcmd = c->mstate.commands[j].cmd;

        /* ACL permissions are also checked at the time of execution in case
         * they were changed after the commands were queued. */
        int acl_errpos;
        int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
        if (acl_retval != ACL_OK) {
            // 省略部分代码..
        } else {
            // 调用命令
            if (c->id == CLIENT_ID_AOF)
                call(c,CMD_CALL_NONE);
            else
                call(c,CMD_CALL_FULL);

            serverAssert((c->flags & CLIENT_BLOCKED) == 0);
        }

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }

    // restore old DENY_BLOCKING value
    if (!(old_flags & CLIENT_DENY_BLOCKING))
        c->flags &= ~CLIENT_DENY_BLOCKING;

    c->argv = orig_argv;
    c->argv_len = orig_argv_len;
    c->argc = orig_argc;
    c->cmd = c->realcmd = orig_cmd;
    discardTransaction(c);

    server.in_exec = 0;
}

该命令就是依次遍历已经缓存的所有命令。

discard 命令

该命令放弃所有在multi之后的命令。来看下discardCommand

void discardCommand(client *c) {
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"DISCARD without MULTI");
        return;
    }
    // 放弃所有的命令
    discardTransaction(c);
    addReply(c,shared.ok);
}

discardTransaction主要处理该连接的命令缓存、重置连接状态等操作


void discardTransaction(client *c) {
    // 释放客户端的命令缓存状态
    freeClientMultiState(c);
    // 重置该连接的状态
    initClientMultiState(c);
    // 去除相关的标识
    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
    unwatchAllKeys(c);
}

freeClientMultiState释放客户端上的资源

void freeClientMultiState(client *c) {
    int j;
    // 依次释放资源
    for (j = 0; j < c->mstate.count; j++) {
        int i;
        multiCmd *mc = c->mstate.commands+j;
        //释放参数的引用
        for (i = 0; i < mc->argc; i++)
            decrRefCount(mc->argv[i]);
        zfree(mc->argv);
    }
    // 释放数组
    zfree(c->mstate.commands);
}

http://www.kler.cn/a/600717.html

相关文章:

  • C/S与B/S架构
  • ThreadLocal 的用途与用法全解析:Java 多线程开发的利器
  • C++中将记录集的数据复制到Excel工作表中的CRange类CopyFromRecordset函数异常怎么捕获
  • 【c++入门系列】:引用以及内联函数详解
  • javaweb自用笔记:Mybatis
  • Java 线程池全面解析
  • 【Pandas】pandas Series to_csv
  • vue3中watch 函数参数说明
  • 小蓝的括号串(栈,dfs)
  • PHP在2025年的新趋势与应用
  • xilinx约束中set_property -dict表示什么意思
  • Nuxt出现Error: Failed to download template from registry
  • C语言复习笔记--函数递归
  • Hugging Face Spaces 介绍与使用指南
  • 4.milvus索引FLAT
  • 黄土高原风蚀区解析多源数据融合与机器学习增强路径-RWEQ+集成技术在风蚀模数估算中的全流程增强策略—从数据融合到模型耦合的精细化操作指南
  • Linux云计算SRE-第二十一周
  • 国产开发板—米尔全志T113-i如何实现ARM+RISC-V+DSP协同计算?
  • 深入理解JavaScript中的同步和异步编程模型及应用场景
  • 2025年DeepSeek行业应用实践报告