Redis原理:multiexec命令
在Redis中,MULTI
和EXEC
命令提供了一种机制来确保一系列命令以原子性的方式被执行。这意味着一旦这些命令被组合在一起并发送到服务器端,它们将作为一个整体被处理:要么全部成功执行,要么完全不执行。这一特性对于需要保证数据一致性的应用场景非常有用。
具体来说,当你向Redis客户端发送MULTI
命令时,标志着开始一个事务块。从这时起,直到发出EXEC
为止,所有发往该Redis连接的命令都不会立即执行,而是被暂时缓存起来。只有当EXEC
命令被调用后,之前缓存的所有命令才会按照接收顺序一次性地在服务器端执行,并且整个过程是不可中断的。如果在这批命令执行过程中出现了错误(例如某个命令语法有误),那么除了导致错误的那个命令之外,其他命令仍然会被正常执行。
示例如下:
jedis.watch(key); // 在下一节中解析
// 开启批量执行
jedis.multi();
try{
jedis.set(key,value);
jedis.expire(expire,ttl);
// 服务端开始执行
jedis.exec();
catch(Exeception e) {
// 放弃
jedis.discard();
}
源码分析
multi命令
当我们调用multi命令时,会来到mutilCommand方法(multi.c)。
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
// 给当前的客户端添加一个CLIENT_MULTI的标识
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}
该命令只是给当前客户端对象添加一个CLIENT_MULTI
就直接返回了。
其他命令
而后,当我们调用set 和 expire 命令时,会经过以下的判断(server.c中方法processCommand):
// 如果连接的客户端是已经调用了multi命令
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand)
{
// 将命令添加到队列
queueMultiCommand(c);
addReply(c, shared.queued);
}
会判断当前客户端是否有CLIENT_MULTI
,并且不是相关的命令,则会调用queueMultiCommand
方法将命令加入队列。
void queueMultiCommand(client *c) {
multiCmd *mc;
/* No sense to waste memory if the transaction is already aborted.
* this is useful in case client sends these in a pipeline, or doesn't
* bother to read previous responses and didn't notice the multi was already
* aborted. */
// 如果标识中有CLIENT_DIRTY_CAS,则不处理,说明watch的key已经被修改
// 在下一节中说明
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
return;
// 申请命令缓存
// 进行扩容
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
// 获取当前的最后一个对象
// 进行缓存redis命令&参数
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = c->argv;
mc->argv_len = c->argv_len;
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;
/* Reset the client's args since we copied them into the mstate and shouldn't
* reference them from c anymore. */
c->argv = NULL;
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;
}
exec 命令
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc, orig_argv_len;
struct redisCommand *orig_cmd;
// 如果没有搭配multi一起使用,报错
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
// .. 省略部分代码
// 标识正在exec命令中
server.in_exec = 1;
orig_argv = c->argv;
orig_argv_len = c->argv_len;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyArrayLen(c,c->mstate.count);
// 执行所有的命令
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->argv_len = c->mstate.commands[j].argv_len;
c->cmd = c->realcmd = c->mstate.commands[j].cmd;
/* ACL permissions are also checked at the time of execution in case
* they were changed after the commands were queued. */
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
if (acl_retval != ACL_OK) {
// 省略部分代码..
} else {
// 调用命令
if (c->id == CLIENT_ID_AOF)
call(c,CMD_CALL_NONE);
else
call(c,CMD_CALL_FULL);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
}
/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
// restore old DENY_BLOCKING value
if (!(old_flags & CLIENT_DENY_BLOCKING))
c->flags &= ~CLIENT_DENY_BLOCKING;
c->argv = orig_argv;
c->argv_len = orig_argv_len;
c->argc = orig_argc;
c->cmd = c->realcmd = orig_cmd;
discardTransaction(c);
server.in_exec = 0;
}
该命令就是依次遍历已经缓存的所有命令。
discard 命令
该命令放弃所有在multi之后的命令。来看下discardCommand
void discardCommand(client *c) {
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"DISCARD without MULTI");
return;
}
// 放弃所有的命令
discardTransaction(c);
addReply(c,shared.ok);
}
discardTransaction
主要处理该连接的命令缓存、重置连接状态等操作
void discardTransaction(client *c) {
// 释放客户端的命令缓存状态
freeClientMultiState(c);
// 重置该连接的状态
initClientMultiState(c);
// 去除相关的标识
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
unwatchAllKeys(c);
}
freeClientMultiState
释放客户端上的资源
void freeClientMultiState(client *c) {
int j;
// 依次释放资源
for (j = 0; j < c->mstate.count; j++) {
int i;
multiCmd *mc = c->mstate.commands+j;
//释放参数的引用
for (i = 0; i < mc->argc; i++)
decrRefCount(mc->argv[i]);
zfree(mc->argv);
}
// 释放数组
zfree(c->mstate.commands);
}