Redis协议与异步方式
一、redis pipeline
首先我们需要知道从客户端发送一条命令到redis中执行的过程中经历了什么:发送命令->命令排队->命令执行->返回结果。这个过程称为Round trip time(简称RTT,往返时间),如果当我们发送N条命令,那么需要消耗N次RTT,效率比较低下。
这时候我们采用一种叫pipeline来解决这样的问题。通过将多个命令一起发送,然后接收多次结果,这样就可以大大提高效率。这样的方式和异步IO很像。
首先redis pipeline是一个客户端提供的机制,而不是服务端提供的机制。其次对于原生批命令(mset,mget)这些都是原子性操作,但是pipeline是非原子性的。
原子性概念:一个事务是一个不可分割的最小工作单位,要么都成功,要么都失败。因此对于pipeline来说既然它不是原子性的,那么它也就不具备事务性。
因此pipeline只是将多条命令放在一起发送,这样只是为了减少网络传输的时间。原生批命令是服务端实现的,但是pipeline需要服务端和客户端共同完成。
二、redis事务
redis事务是一组命令的集合,允许用户在一个单独的操作序列中执行多个命令,并保证这些命令在执行过程中不会被其他客户端的命令打断。redis事务通过MULTI,EXEC,DISCARD和WATCH等命令来实现。
1、redis事务的基本操作
开启事务:使用MULTI命令标记一个事务的开始。在MULTI命令之后,客户端发送的命令会被Redis服务器接收并放入到一个事务队列中,但不会立即执行。
命令入队:在MULTI和EXEC之间发送的命令都会被加入到事务队列中。这些命令会按照发送的顺序被序列化,并在
EXEC
命令执行时按顺序执行。执行事务:使用EXEC命令执行事务队列中的所有命令。Redis会按照命令入队的顺序执行这些命令,并将所有命令的返回值作为EXEC命令的返回值返回给客户端。
取消事务:如果在事务执行之前需要取消事务,可以使用DISCARD命令。这将清空事务队列,并放弃执行事务块中的所有命令。
下面这个是通过事务进行整体操作:
这个是在运行事务的过程中,中途退出事务,则不会改变其中的变量。
2、乐观锁实现
Redis提供了WATCH命令来实现乐观锁机制。使用WATCH命令可以监视一个或多个键,如果在事务执行之前这些键被其他命令修改了,那么事务将被取消执行。这可以帮助避免在并发环境下出现数据不一致的问题。
通过使用WATCH来观察一个Key值,当这个值被修改的时候那么关于这个值的事务将被取消。下面第一张图我是使用第一个客户端进行观察并且使用事务,第二张图是第二个客户端修改num的值,第三张图就是第二个客户端修改之后,第一张图的变化,发现我们事务的操作失败了。
三、Lua脚本实现原子性
Lua 是一种轻量级的、可扩展的编程语言,设计用来嵌入应用程序中,以提供灵活的脚本功能。在多种环境下,Lua 被用作脚本语言,特别是在游戏开发(如 Unity、Roblox)、网络应用、嵌入式系统等领域。在 Redis、Nginx 等一些系统或应用中,Lua 脚本可以被用来执行复杂的操作,这些操作可能涉及到多次对数据库的访问或复杂的逻辑处理。
redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会有其他命令或者脚本被执行;
lua 脚本当中的命令会直接修改数据状态;
lua 脚本 mysql 存储区别:MySQL存储过程不具备事务性,所以也不具备原子性;
注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;
接下来我们使用lua脚本来实现执行相关命令,其中使用了EVAL和EVALSHA。我们通过EVAL来实现数据的翻倍。
EVAL script numkeys key [key ...] arg [arg ...]
//script是要执行的Lua脚本
//numkeys是后续参数中key的数量,用来区分哪些参数是键名,哪些是普通的参数
//key传入的键名
//arg传入的参数
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
//sha1是哈希值
1、EVAL实现数据翻倍
eval 'local key = KEYS[1]; local val = redis.call("get",key); redis.call("set",key,2*val);return 2*val ' 1 num
//在''这个之中的代码就是lua脚本
//我们先看后面的参数: 1 num ,这告诉lua脚本有一个key,这个key叫num
//那么我们通过local key = KEYS[1]; 获得第一个key:num
//通过local val = redis.call("get",key); get num ,这个来获得num的值,假如这里val是4
//redis.call("set",key,2*val); set num 8 ,这个8就是二倍的4,那么我们就实现了翻倍
//最后return 2*val; 告诉我们最后翻倍的值是多少。
2、EVALSHA实现数据翻倍
虽然我们这样写之后很方便,直接复制我们写好之后的lua脚本就可以,但是我们这只是个很小的测试案例,在项目中这段lua脚本会很长,这样的话就不会很方便了。那么我们将这段代码通过哈希一下,得到一个哈希值,我们使用这个哈希值来代替这段lua脚本,岂不是很方便。于是我们就可以使用这个EVALSHA了。
我们通过使用 script load 来将我们的lua脚本进行生成哈希值,这个哈希值就代表我们的lua脚本。我们可以通过 script exists 来查看某个哈希值是否存在。然后我们就可以使用这个哈希值了。
# 清除所有脚本缓存
> script flush
# 如果当前脚本运行时间过长(死循环),可以通过 script kill 杀死当前运行的脚本
> script kill
四、发布订阅
为了支持消息的多播机制,redis 引入了发布订阅模块; 消息不一定可达;分布式消息队列; stream 的方式确保一定可达;
# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容
发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而 pubsub 能收到 redis 主动推送的内容;所以实际项目中如果支持 pubsub 的话,需要另开一条连接 用于处理发布订阅;
发布订阅的生产者传递过来一个消息,redis 会直接找到相应的消费者并传递过去;假如没有消费者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到 消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失 了; 另外,redis 停机重启,pubsub 的消息是不会持久化的,所有的消息被直接丢弃;
subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'king kiss darren'
lua脚本和订阅的具体应用:当项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load);项目中若需要热更新,通过redis-cli script flush;然后可以通过订阅发布功能通知所有服务器,重新加载lua脚本;若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;
五、ACID特性分析
A 原子性:事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis 不支持回滚;即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。
C 一致性:事务的前后,所有的数据都保持一个一致的状态,不能违反数据的一致性检测;这里的一致性是指预期的一致性而不是异常后的一致性;所以 redis 也不满足;这个争议很大:redis 能确保事务执行前后的数据的完整约束;但是并不满足业务功能上的一致性;比如转账功能,一个扣钱一个加钱;可能出现扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;
I 隔离性:各个事务之间互相影响的程度;redis 是单线程执行,天然具备隔离性;
D 持久性:redis 只有在 aof 持久化策略的时候,并且需要在 redis.conf 中 appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;
AOF(Append Only File)持久化策略是Redis数据库用于保证数据持久性的重要机制之一。AOF持久化通过记录Redis服务器所接收的写命令来保存数据库状态,当Redis重启时,它会重新执行AOF文件中的命令来恢复数据。
那么我们可以发现 lua 脚本满足原子性和隔离性;一致性和持久性不满足;
六、redis的同步连接
首先我们如果需要与redis进行连接的话,需要用到hiredis库通过使用这个hiredis库就可以与redis进行通信了。下面我们可以看一个同步连接的例子。
redisContext *c; //redis的上下文,也就是与redis连接的实体
redisReply *reply; //redis的回复
const char *hostname = "127.0.0.1";
int port = 6379;
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
c = redisConnectWithTimeout(hostname, port, timeout); //连接,超时时间。
if (c == NULL || c->err) {
if (c) {
printf("Connection error: %s\n", c->errstr);
redisFree(c);
} else {
printf("Connection error: can't allocate redis context\n");
}
exit(1);
}
int roleid = 10001;
reply = redisCommand(c, "hgetall role:%d", roleid); //调用执行的命令,这里我们固定查询 role:10001 这个key值
if (reply->type != REDIS_REPLY_ARRAY) {
printf("reply error: %s\n", reply->str);
} else {
printf("reply:number of elements=%lu\n", reply->elements);
for (size_t i = 0; i < reply->elements; i++) {
printf("\t %lu : %s\n", i, reply->element[i]->str);
}
}
七、redis的异步连接
同步连接方案采用阻塞 io 来实现;优点是代码书写是同步的,业务逻辑没有割裂;缺点是阻塞当前线程,直至 redis 返回结果;通常用多个线程来实现线程池来解决效率问题;
异步连接方案采用非阻塞 io 来实现;优点是没有阻塞当前线程,redis 没有返回,依然可以往 redis 发送命令;缺点是代码书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决 (openresty,skynet);配合 redis6.0 以后的 io 多线程(前提是有大量并发请求),异步连接池,能更好解决应用层的数据访问性能;
首先redis提供了同步和异步不同的对象,并且对于异步的结构体来说,正好可以适配我们的reactor模型的回调函数。
//同步
redisContext *c = redisConnectWithTimeout("127.0.0.1", 6379, timeout);
//异步
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
//因此我们使用的时候一定要区分。
1、redis_event_t 的结构
struct event_s {
int fd;
reactor_t *r;
buffer_t *in; //读缓冲区
buffer_t *out; //写缓冲区
event_callback_fn read_fn; //一些回调函数
event_callback_fn write_fn;
error_callback_fn error_fn;
}event_t;
typedef struct {
event_t e; //事件
int mask; //事件触发的类型
redisAsyncContext *ctx; //异步redis的上下文
} redis_event_t;
typedef struct redisAsyncContext {
redisContext c; //同步redis的上下文
int err;
char *errstr;
void *data;
void (*dataCleanup)(void *privdata);
struct {
void *data;
void (*addRead)(void *privdata);
void (*delRead)(void *privdata);
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
void (*cleanup)(void *privdata);
void (*scheduleTimer)(void *privdata, struct timeval tv);
} ev;
redisDisconnectCallback *onDisconnect;
redisConnectCallback *onConnect;
redisCallbackList replies;
struct sockaddr *saddr;
size_t addrlen;
struct {
redisCallbackList invalid;
struct dict *channels;
struct dict *patterns;
} sub;
redisAsyncPushFn *push_cb;
} redisAsyncContext;
我们发现这个 redis_event_t 中所包含event_t、异步redisAsyncContext ,而在异步 redisAsyncContext 中却包含着同步 redisContext 。因此我们可以画一下这个图。这样我们可以清楚的知道,在异步上下文中包含着读写事件的注册和删除,在同步上下文中包含着自己的fd,在event_t中包含着传入进来的reactor和一系列的回调函数将这全部的东西全部放入到redis_event_t中去。(如果想知道具体的同步异步上下文都包含什么,可以自己去下载hiredis代码去查看)。
2、redisAttach
我们在这个函数中将这个异步上下文和这个reactor绑在一起。其中设置各种参数,也就是说将他们这一群都互相认定一样。
static int redisAttach(reactor_t *r, redisAsyncContext *ac) {
redisContext *c = &(ac->c); //上面那个是异步的上下文,这个是同步的上下文,异步的上下文中含有回调函数,同步中含有fd
redis_event_t *re; //这个event中含有自己写的事件(回调函数)和异步的上下文,也就是说这个包含上面的异步上下文。
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL) //我们通过查看这个,就可以判断这个是否已经初始化
return REDIS_ERR;
/* Create container for ctx and r/w events */
re = (redis_event_t*)hi_malloc(sizeof(*re)); //开辟这个类型的空间
if (re == NULL)
return REDIS_ERR;
re->ctx = ac; //再这个re中携带上这个异步的上下文。
re->e.fd = c->fd; //再携带上同步上下文的fd
re->e.r = r; //再包含上咱们传入的reactor
//由于咱们自己写的事件类型中包含读写缓冲区,但是我们需要使用hiredis中的读写缓冲区
// dont use event buffer, using hiredis's buffer
re->e.in = NULL;
re->e.out = NULL;
re->mask = 0; //现在的事件为0
//为这个异步中的回调函数设置上咱们自己的回调函数。
ac->ev.addRead = redisAddRead; //添加读事件
ac->ev.delRead = redisDelRead;
ac->ev.addWrite = redisAddWrite;
ac->ev.delWrite = redisDelWrite;
ac->ev.cleanup = redisCleanup;
ac->ev.data = re; //我们将这个redis_event_t和这个异步上下文互相指向
return REDIS_OK;
}
3、redisAddRead
这些回调函数太多了,我们只拿出一个举例。当我们看到 redisEventUpdate 函数的时候,发现里面写的都是关于 epoll 中的event的事件,因此可以发现我们写的reactor和这个异步hiredis非常适配。
//添加读事件
static void redisAddRead(void *privdata) {
redis_event_t *re = (redis_event_t *)privdata; //将这个参数强转成这个事件类型
re->e.read_fn = redisReadHandler; //设置这个读事件的回调函数
redisEventUpdate(privdata, EPOLLIN, 0);
}
//设置读事件的处理函数(这里是回调函数)
static void redisReadHandler(int fd, int events, void *privdata) {
((void)fd);
((void)events);
printf("redisReadHandler %d\n", fd);
event_t *e = (event_t*)privdata;
redis_event_t *re = (redis_event_t *)(char *)e; //将这个小的event包含在这个redis_event中了
redisAsyncHandleRead(re->ctx); //调用库函数,将这个异步的上下文传入进去
}
//更新它的事件状态
static void redisEventUpdate(void *privdata, int flag, int remove) {
redis_event_t *re = (redis_event_t *)privdata;
reactor_t *r = re->e.r;
int prevMask = re->mask;
int enable = 0;
if (remove) {
if ((re->mask & flag) == 0)
return ;
re->mask &= ~flag;
enable = 0;
} else {
if (re->mask & flag)
return ;
re->mask |= flag;
enable = 1;
}
int fd = re->ctx->c.fd;
if (re->mask == 0) {
del_event(r, &re->e);
} else if (prevMask == 0) {
add_event(r, re->mask, &re->e);
} else {
if (flag & EPOLLIN) {
enable_event(r, &re->e, enable, 0);
} else if (flag & EPOLLOUT) {
enable_event(r, &re->e, 0, enable);
}
}
}
4、运行的具体函数
我们通过这个函数对redis进行具体的执行操作。比如下面的查看所有的role:10001的value。
void
connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
stop_eventloop(R);
return;
}
printf("Connected...\n");
redisAsyncCommand((redisAsyncContext *)c, dumpReply,
"hmset role:10001",
"hmset role:10001 name mark age 31 sex male");
int a = 10;
redisAsyncCommand((redisAsyncContext *)c, dumpReply, "hgetall role:10001", "hgetall role:10001");
// ....
}
5、整个过程
int main(int argc, char **argv) {
R = create_reactor();
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); //进行连接操作
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return 1;
}
redisAttach(R, c); //将这俩参数传入进去进行绑定,设置一些回调函数
redisAsyncSetConnectCallback(c, connectCallback); //连接上的回调函数,连接后第一时间执行的
redisAsyncSetDisconnectCallback(c, disconnectCallback);
eventloop(R); //将这个程序运行起来
release_reactor(R);
return 0;
}
感谢大家的观看!0voice · GitHub