redis工程实战介绍(含面试题)
文章目录
- redis单线程VS多线程
- 面试题
- **redis是多线程还是单线程,为什么是单线程**
- **聊聊redis的多线程特性和IO多路复用**
- **io多路复用模型**
- **redis如此快的原因**
- BigKey
- 大批量插入数据测试数据key
- 面试题
- 海量数据里查询某一固定前缀的key
- 如果生产上限值keys * ,flushdb,flushall等危险命令以防止误操作
- bigkey如何处理
- memory usage 命令使用过吗,如何发现bigkey
- **删除bigkey**
- bigkey调优,惰性释放lazyfree了解过吗
- 缓存与数据库双写一致
- 理论
- 先删除缓存,再更新数据库,会出现什么问题
- 双检加锁策略
- 实操
- canal
- 是什么
- 作用
- canal工作原理
- 官网地址
- mysql
- 查看mysql版本
- 当前主机二进制日志
- 查看show variables like 'log_bin'
- 开启mysql的binlog写入功能
- 重启mysql
- 再次查看log_bin
- 授权canal连接mysql账号
- canal服务端
- 下载
- 解压
- 配置
- 启动
- 查看
- canal客户端(java编写业务程序)
- sql脚本
- 建module
- 改pom
- 配置文件
- 启动类
- 业务类
- 案例实战bitmap/hyperloglog/GEO
- 统计的类型有哪些
- 聚合统计
- 排序统计
- 二值统计
- 基数统计
- hyperloglog案例实战
- 去重统计的方案
- 淘宝网站首页亿级UV的redis统计方案
- GEO案例实战
- 美团地图位置附近的酒店推送
- bitmap案例实战
- 布隆过滤器BloomFilter
- 手写一个布隆过滤器
- 京东签到送京豆
- 缓存预热,缓存雪崩,缓存击穿,缓存穿透
- 缓存预热
- 缓存雪崩
- 预防和解决
- 缓存击穿
- 是什么
- 解决方案
- 缓存穿透
- 是什么
- 解决方案
- 手写redis分布式锁
- 面试题
- 分布式锁
- Lua脚本
- 可重入+设计模式
- 自动续期
- RedLock算法及原理
- 设计理念
- 解决方案
- redisson实战案例(单机)
- redisson实战案例(多机)多重锁
- reids的缓存过期淘汰策略
- redis的缓存内容大小以及配置
- reids的默认内容是多少?在哪里配置,以及如何修改
- 往redis里写的数据是怎么没得?他是如何删除的?
- lru算法和lfu算法的区别
- 八种淘汰策略
- 微信抢红包功能实现
- 需求分析
- 代码
redis单线程VS多线程
面试题
redis是多线程还是单线程,为什么是单线程
这种问法其实并不严谨,为啥这么说呢?
Redis的版本很多3.x、4.x、6.x,版本不同架构也是不同的,不限定版本问是否单线程也不太严谨。
-
版本3.x ,最早版本,也就是大家口口相传的redis是单线程,阳哥2016年讲解的redis就是3.X的版本。
-
版本4.x,严格意义来说也不是单线程,而是负责处理客户端请求的线程是单线程,但是开始加了点多线程的东西(异步删除)。—貌似
-
2020年5月版本的6.0.x后及2022年出的7.0版本后,告别了大家印象中的单线程,用一种全新的多线程来解决问题。—实锤
redis是单线程
主要是指redis的网络io和键值对的读写是由一个线程来完成的,redis在处理客户端的请求的时候包括获取socket读,解析,执行内容返回,socket写等都是有一个顺序串行的主线程处理,这就是所谓的“单线程”,这也是redis对外提供键值存储服务的主要流程
但redis的其他功能,比如持久化RDB,AOF,异步删除,集群数据同步等,其实是由额外的线程执行的
redis命令工作线程是单线程的,但是,整个redis来说,是多线程的
其实纯单线程也会有问题
正常情况下使用 del 指令可以很快的删除数据,而当被删除的 key 是一个非常大的对象时,例如时包含了成千上万个元素的 hash 集合时,那么 del 指令就会造成 Redis 主线程卡顿。
这就是redis3.x单线程时代最经典的故障,大key删除的头疼问题,
由于redis是单线程的,del bigKey …
等待很久这个线程才会释放,类似加了一个synchronized锁,你可以想象高并发下,程序堵成什么样子?
解决方案:
-
使用惰性删除可以有效避免redis卡顿问题
-
比如当我(Redis)需要删除一个很大的数据时,因为是单线程原子命令操作,这就会导致 Redis 服务卡顿,
-
于是在 Redis 4.0 中就新增了多线程的模块,当然此版本中的多线程主要是为了解决删除数据效率比较低的问题的。
unlink key
flushdb async
flushall async
把删除工作交给了后台的小弟(子线程)异步来删除数据了。
聊聊redis的多线程特性和IO多路复用
对于redis主要的性能瓶颈是内存或者网络带宽并非CPU
在redis6/7中,非常受关注的第一个新特性就是多线程
- 这是因为,redis一直被大家熟知的就是他的单线程架构,虽然有些命令可以用后台线程或子进程执行(比如数据删除,快照生成,aof重写)但是从网络io处理到实际的读写命令处理,都是由单个线程完成的
- 为了应对这个问题:采用多个io线程来处理网络请求,提高网络请求处理的并行度,redis6/7就是采用这种方法
- 但是reids的多io线程只是用来处理网络请求的,
对于读写命令操作仍然是使用单线程来处理
,这是因为redis处理请求时,网络处理经常是瓶颈,通过多个io线程并行处理网络操作,可以提升实例的整体处理性能,而继续使用单线程执行命令操作,就不用为了保证Lua脚本,事务的原子性,额外开发多线程互斥加锁机制了(不管加锁操作处理)
,这样一来,redis线程模型实现就简单了
IO多路复用是什么?
- 一种同步的IO模型,实现一个线程监视多个文件句柄,一旦某个文件句柄就绪就能够通知到对应的应用程序进行相应的对鞋操作,没有文件句柄时会阻塞应用程序,从而释放CPU资源
- I/O:网络I/O,尤其在操作系统层面指数据在内核态和用户态之间的读写操作
- 多路:多个客户端连接(即socket)
- 复用:复用一个或几个线程
- IO多路复用:也就是说一个或一组线程处理多个TCP连接,使用单进程就能实现同时处理多个客户端的连接,无需创建或者维护过多的进程/线程
io多路复用模型
将用户socket对应的文件描述符(FileDescriptor)注册进epoll,然后epoll帮你监听哪些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用非阻塞模式。这样,整个过程只在调用select、poll、epoll这些调用的时候才会阻塞,收发客户消息是不会阻塞的,整个进程或者线程就被充分利用起来,这就是事件驱动,所谓的reactor反应模式。
在单个线程通过记录跟踪每一个Sockek(I/O流)的状态来同时管理多个I/O流
. 一个服务端进程可以同时处理多个套接字描述符。
目的是尽量多的提高服务器的吞吐能力。
大家都用过nginx,nginx使用epoll接收请求,ngnix会有很多链接进来, epoll会把他们都监视起来,然后像拨开关一样,谁有数据就拨向谁,然后调用相应的代码处理。redis类似同理,这就是IO多路复用原理,有请求就响应,没请求不打扰。
redis如此快的原因
IO多路复用+epoll函数使用,才是redis为什么这么快的直接原因,而不是仅仅单线程命令+redis安装在内存中。
BigKey
大批量插入数据测试数据key
linux bash执行
生成100W条redis批量设置kv的语句(key=kn,value=vn)
写入到/tmp目录下的redisTest.txt文件中
for((i=1;i<=100*10000;i++)); do echo "set k$i v$i" >> /tmp/redisTest.txt ;done;
[root@iZuf60dna3h67gfyijgutrZ ~]# for((i=1;i<=100*10000;i++)); do echo "set k$i v$i" >> /tmp/redisTest.txt ;done;
通过redis提供的管道–pipe命令插入100w的数据
请结合自己的机器地址
cat /tmp/redisTest.txt | /opt/redis-7.0.0/src/redis-cli -h 127.0.0.1 -p 6379 -a 密码 --pipe
[root@iZuf60dna3h67gfyijgutrZ tmp]# cat /tmp/redisTest.txt | /opt/redis-7.0.0/src/redis-cli -h 127.0.0.1 -p 6379 -a sunxin --pipe
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
All data transferred. Waiting for the last reply...
Last reply received from server.
errors: 0, replies: 2000000
[root@iZuf60dna3h67gfyijgutrZ tmp]# redis-cli -a sunxin
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> DBSIZE
(integer) 1000002
插入成功
面试题
海量数据里查询某一固定前缀的key
使用scan命令
SCAN 命令是一个基于游标的迭代器,每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数, 以此来延续之前的迭代过程。
SCAN 返回一个包含两个元素的数组,
第一个元素是用于进行下一次迭代的新游标,
第二个元素则是一个数组, 这个数组中包含了所有被迭代的元素。如果新游标返回零表示迭代已结束。
SCAN的遍历顺序
非常特别,它不是从第一维数组的第零位一直遍历到末尾,而是采用了高位进位加法来遍历。之所以使用这样特殊的方式进行遍历,是考虑到字典的扩容和缩容时避免槽位的遍历重复和遗漏。
如果生产上限值keys * ,flushdb,flushall等危险命令以防止误操作
通过配置设置禁用这些命令,redis.conf在security这一项中
bigkey如何处理
多大算big
参考《阿里云redis开发手册》
bigkey的危害
- 内存不均,集群迁移困难
- 超时删除,大key删除阻塞主线程
- 网络流量阻塞
memory usage 命令使用过吗,如何发现bigkey
第一种
redis-cli --bigkeys
好处,见最下面总结
给出每种数据结构Top 1 bigkey,同时给出每种数据类型的键值个数+平均大小
不足
想查询大于10kb的所有key,–bigkeys参数就无能为力了,需要用到memory usage来计算每个键值的字节数
redis-cli --bigkeys -a 111111
redis-cli -h 127.0.0.1 -p 6379 -a 111111 --bigkeys
每隔 100 条 scan 指令就会休眠 0.1s,ops 就不会剧烈抬升,但是扫描的时间会变长
redis-cli -h 127.0.0.1 -p 7001 –-bigkeys -i 0.1
第二种
官网说明
删除bigkey
官网
-
string:一般用del,如果过于庞大用unlink
-
hash:
使用hscan每次获取少量field-value,再使用hdel删除每个field
-
list
使用trim渐进式逐步删除,直到全部删除完成
-
set
使用sscan每次获取部分元素,再使用srem命令删除每个元素
-
zset
使用zscan每次获取部分元素,再使用ZREMRANGEBYRANK命令删除每个元素
bigkey调优,惰性释放lazyfree了解过吗
redis.conf配置文件LAZY FREEING相关说明
缓存与数据库双写一致
理论
先更新数据库,再删除缓存
无法实时一致,只能最终一致
- 可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中(例如使用Kafka/RabbitMQ等)。
- 当程序没有能够成功地删除缓存值或者是更新数据库值时,可以从消息队列中重新读取这些值,然后再次进行删除或更新。
- 如果能够成功地删除或更新,我们就要把这些值从消息队列中去除,以免重复操作,此时,我们也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试
- 如果重试超过的一定次数后还是没有成功,我们就需要向业务层发送报错信息了,通知运维人员。
先删除缓存,再更新数据库,会出现什么问题
**问题:**当A线程来更新数据,此时缓存已经被干掉了,准备更新数据库的时候,这个时候还没更新完,此时,B线程来进行查询,他发现缓存读不到,就去查数据库(这个时候是旧值),就将这个旧值又写回了redis了。A这个时候懵了,我删的怎么又变回去了,这个时候A更新完数据库,就发现数据库和redis数据不一致了
解决方案: 延迟双删
双检加锁策略
多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。
其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。
后面的线程进来发现已经有缓存了,就直接走缓存。
实操
canal
下载地址
是什么
canal [kə’næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;
历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;
作用
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引)
- 业务cache刷新
- 带业务逻辑的增量数据处理
canal工作原理
传统mysql主从复制工作原理
MySQL的主从复制将经过如下步骤:
1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,
如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;
canal工作原理
官网地址
官网
mysql
查看mysql版本
select version();
mysql5.7.28
当前主机二进制日志
show master status;
查看show variables like ‘log_bin’
开启mysql的binlog写入功能
编辑my.ini,请事先备份文件
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复
ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;
window系统下是my.ini
linux系统下是my.conf
重启mysql
再次查看log_bin
授权canal连接mysql账号
默认是没有canal账户,此处新建并授权
DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;
canal服务端
(注意默认linux服务器已经有java8的环境,因为canal是java写的需要java运行环境)
下载
下载地址https://github.com/alibaba/canal/releases/tag/canal-1.1.6
注意发布时间+版本,2022.8.11后发布的才用
解压
解压放在/mycanal路径下
配置
- 修改/mycanal/conf/example路径下instance.properties文件
换成自己的mysql主机master的IP地址
换成自己的在mysql新建的canal账户
启动
在/opt/mycanal/bin路径下执行
./startup.sh
查看
判断canal是否启动成功
查看server日志
查看样例example的日志
canal客户端(java编写业务程序)
sql脚本
1 随便选个数据库,以你自己为主,本例bigdata,按照下面建表
CREATE TABLE `t_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`userName` varchar(100) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
建module
canal_demo02
改pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.canal</groupId>
<artifactId>canal_demo02</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<mysql.version>5.1.47</mysql.version>
<druid.version>1.1.16</druid.version>
<mapper.version>4.1.5</mapper.version>
<mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
</properties>
<dependencies>
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!--SpringBoot通用依赖模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--SpringBoot与AOP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<!--Mysql数据库驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SpringBoot集成druid连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!--mybatis和springboot整合-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
<!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--persistence-->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0.2</version>
</dependency>
<!--通用Mapper-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>${mapper.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置文件
注意使用自己的mysql密码
server.port=5555
# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false
启动类
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CanalDemo02App
{
//本例不要启动CanalDemo02App实例
}
业务类
redisUtils
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtils
{
public static final String REDIS_IP_ADDR = "192.168.111.185";
public static final String REDIS_pwd = "111111";
public static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
}
public static Jedis getJedis() throws Exception {
if(null!=jedisPool){
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}
}
RedisCanalClientExample
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class RedisCanalClientExample
{
public static final Integer _60SECONDS = 60;
public static final String REDIS_IP_ADDR = "192.168.111.185";
private static void redisInsert(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisDelete(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.del(columns.get(0).getValue());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisUpdate(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
//获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
}
//获取变动类型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
public static void main(String[] args)
{
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
//=================================
// 创建链接canal服务端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
11111), "example", "", "");
int batchSize = 1000;
//空闲空转计数器
int emptyCount = 0;
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
connector.connect();
//connector.subscribe(".*\\..*");指的是全库全表,一般不推荐这样用,影响canal性能,可能导致数据不一致
connector.subscribe("bigdata.t_user");
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
//计数器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
} finally {
connector.disconnect();
}
}
}
为什么jedis在代码中不进行释放资源的操作呢?
自动释放了,因为jedis继承了BinaryJedis接口这个接口实现了closeable所以就自动释放了,相当于一个语法糖
案例实战bitmap/hyperloglog/GEO
统计的类型有哪些
亿级系统中常见的四种统计
聚合统计
排序统计
解决方案
v1,v2…代表文本内容
二值统计
基数统计
hyperloglog案例实战
行业术语
UV
PV
DAU
MAU
需求:
很多计数类场景,比如 每日注册 IP 数、每日访问 IP 数、页面实时访问数 PV、访问用户数 UV等。
因为主要的目标高效、巨量地进行计数,所以对存储的数据的内容并不太关心。
也就是说它只能用于统计巨量数量,不太涉及具体的统计对象的内容和精准性。
统计单日一个页面的访问量(PV),单次访问就算一次。
统计单日一个页面的用户访问量(UV),即按照用户为维度计算,单个用户一天内多次访问也只算一次。
多个key的合并统计,某个门户网站的所有模块的PV聚合统计就是整个网站的总PV。
去重统计的方案
- hashset
- bitmap
如果数据显较大亿级统计,使用bitmaps同样会有这个问题。
bitmap是通过用位bit数组来表示各元素是否出现,每个元素对应一位,所需的总内存为N个bit。
基数计数则将每一个元素对应到bit数组中的其中一位,比如bit数组010010101(按照从零开始下标,有的就是1、4、6、8)。
新进入的元素只需要将已经有的bit数组和新加入的元素进行按位或计算就行。这个方式能大大减少内存占用且位操作迅速。
But,假设一个样本案例就是一亿个基数位值数据,一个样本就是一亿
如果要统计1亿个数据的基数位值,大约需要内存100000000/8/1024/1024约等于12M,内存减少占用的效果显著。
这样得到统计一个对象样本的基数值需要12M。
如果统计10000个对象样本(1w个亿级),就需要117.1875G将近120G,可见使用bitmaps还是不适用大数据量下(亿级)的基数计数场景
- hyperloglog
淘宝网站首页亿级UV的redis统计方案
需求
方案
- mysql(直接死,mysql最大300w)
- 用redis的hash结构存储
redis——hash = <keyDay,<ip,1>>
按照ipv4的结构来说明,每个ipv4的地址最多是15个字节(ip = “192.168.111.1”,最多xxx.xxx.xxx.xxx)
某一天的1.5亿 * 15个字节= 2G,一个月60G,redis死定了。
- hyperloglog
为什么是只需要花费12Kb?
service
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class HyperLogLogService
{
@Resource
private RedisTemplate redisTemplate;
/**
* 模拟后台有用户点击首页,每个用户来自不同ip地址
*/
@PostConstruct
public void init()
{
log.info("------模拟后台有用户点击首页,每个用户来自不同ip地址");
new Thread(() -> {
String ip = null;
for (int i = 1; i <=200; i++) {
Random r = new Random();
ip = r.nextInt(256) + "." + r.nextInt(256) + "." + r.nextInt(256) + "." + r.nextInt(256);
Long hll = redisTemplate.opsForHyperLogLog().add("hll", ip);
log.info("ip={},该ip地址访问首页的次数={}",ip,hll);
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
}
},"t1").start();
}
}
controller
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@Api(description = "淘宝亿级UV的Redis统计方案")
@RestController
@Slf4j
public class HyperLogLogController
{
@Resource
private RedisTemplate redisTemplate;
@ApiOperation("获得IP去重后的首页访问量")
@RequestMapping(value = "/uv",method = RequestMethod.GET)
public long uv()
{
//pfcount
return redisTemplate.opsForHyperLogLog().size("hll");
}
}
GEO案例实战
命令复习
GEOADD city 116.403963 39.915119 "天安门" 116.403414 39.924091 "故宫" 116.024067 40.362639 "长城"
GEOPOS city 天安门 故宫
GEOHASH city 天安门 故宫 长城
GEODIST city 天安门 长城 km
以半径为中心,查找附近的xxx
georadius 以给定的经纬度为中心, 返回键包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素。
GEORADIUS city 116.418017 39.914402 10 km withdist withcoord count 10 withhash desc
GEORADIUS city 116.418017 39.914402 10 km withdist withcoord count 10 desc
WITHDIST: 在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。 距离的单位和用户给定的范围单位保持一致。
WITHCOORD: 将位置元素的经度和维度也一并返回。
WITHHASH: 以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。 这个选项主要用于底层应用或者调试, 实际中的作用并不大
COUNT 限定返回的记录数。
美团地图位置附近的酒店推送
controller
import com.atguigu.redis7.service.GeoService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.geo.*;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Api(tags = "美团地图位置附近的酒店推送GEO")
@RestController
@Slf4j
public class GeoController
{
@Resource
private GeoService geoService;
@ApiOperation("添加坐标geoadd")
@RequestMapping(value = "/geoadd",method = RequestMethod.GET)
public String geoAdd()
{
return geoService.geoAdd();
}
@ApiOperation("获取经纬度坐标geopos")
@RequestMapping(value = "/geopos",method = RequestMethod.GET)
public Point position(String member)
{
return geoService.position(member);
}
@ApiOperation("获取经纬度生成的base32编码值geohash")
@RequestMapping(value = "/geohash",method = RequestMethod.GET)
public String hash(String member)
{
return geoService.hash(member);
}
@ApiOperation("获取两个给定位置之间的距离")
@RequestMapping(value = "/geodist",method = RequestMethod.GET)
public Distance distance(String member1, String member2)
{
return geoService.distance(member1,member2);
}
@ApiOperation("通过经度纬度查找北京王府井附近的")
@RequestMapping(value = "/georadius",method = RequestMethod.GET)
public GeoResults radiusByxy()
{
return geoService.radiusByxy();
}
@ApiOperation("通过地方查找附近,本例写死天安门作为地址")
@RequestMapping(value = "/georadiusByMember",method = RequestMethod.GET)
public GeoResults radiusByMember()
{
return geoService.radiusByMember();
}
}
service
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metrics;
import org.springframework.data.geo.Point;
import org.springframework.data.geo.Circle;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class GeoService
{
public static final String CITY ="city";
@Autowired
private RedisTemplate redisTemplate;
public String geoAdd()
{
Map<String, Point> map= new HashMap<>();
map.put("天安门",new Point(116.403963,39.915119));
map.put("故宫",new Point(116.403414 ,39.924091));
map.put("长城" ,new Point(116.024067,40.362639));
redisTemplate.opsForGeo().add(CITY,map);
return map.toString();
}
public Point position(String member) {
//获取经纬度坐标
List<Point> list= this.redisTemplate.opsForGeo().position(CITY,member);
return list.get(0);
}
public String hash(String member) {
//geohash算法生成的base32编码值
List<String> list= this.redisTemplate.opsForGeo().hash(CITY,member);
return list.get(0);
}
public Distance distance(String member1, String member2) {
//获取两个给定位置之间的距离
Distance distance= this.redisTemplate.opsForGeo().distance(CITY,member1,member2, RedisGeoCommands.DistanceUnit.KILOMETERS);
return distance;
}
public GeoResults radiusByxy() {
//通过经度,纬度查找附近的,北京王府井位置116.418017,39.914402
Circle circle = new Circle(116.418017, 39.914402, Metrics.KILOMETERS.getMultiplier());
//返回50条
RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50);
GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults= this.redisTemplate.opsForGeo().radius(CITY,circle, args);
return geoResults;
}
public GeoResults radiusByMember() {
//通过地方查找附近
String member="天安门";
//返回50条
RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50);
//半径10公里内
Distance distance=new Distance(10, Metrics.KILOMETERS);
GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults= this.redisTemplate.opsForGeo().radius(CITY,member, distance,args);
return geoResults;
}
}
bitmap案例实战
日活统计
连续签到打卡
最近一周的活跃用户
统计指定用户一年之中的登陆天数
布隆过滤器BloomFilter
是什么
由一个初值都为0的bit数组和多个哈希函数构成,用来快速判断集合中是否存在某个元素
布隆过滤器(英语:Bloom Filter)是 1970 年由布隆提出的。
它实际上是一个很长的二进制数组(00000000)+一系列随机hash算法映射函数,主要用于判断一个元素是否在集合中。
通常我们会遇到很多要判断一个元素是否在某个集合中的业务场景,一般想到的是将集合中所有元素保存起来,然后通过比较确定。
链表、树、哈希表等等数据结构都是这种思路。但是随着集合中元素的增加,我们需要的存储空间也会呈现线性增长,最终达到瓶颈。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为O(n),O(logn),O(1)。这个时候,布隆过滤器(Bloom Filter)就应运而生
特点考点
一个元素如果判断结果存在时,元素不一定存在,但判断结果为不存在时,则一定不存在
布隆过滤器可以添加元素,但不能删除元素,由于涉及hashcode判断依据,删掉元素会导致误判率增加
原理
hash冲突案例
使用场景
-
解决缓存穿透问题,哥redis结合bitmap使用
-
黑名单校验,识别垃圾邮件
-
安全链接网址,全球上10亿的网址判断
手写一个布隆过滤器
结合bitmap类型手写一个简单的布隆过滤器
整体架构
- 白名单初始化
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* 布隆过滤器白名单初始化工具类,一开始就设置一部分数据为白名单所有,
* 白名单业务默认规定:布隆过滤器有,redis也有。
*/
@Component
@Slf4j
public class BloomFilterInit
{
@Resource
private RedisTemplate redisTemplate;
@PostConstruct//初始化白名单数据,故意差异化数据演示效果......
public void init()
{
//白名单客户预加载到布隆过滤器
String uid = "customer:12";
//1 计算hashcode,由于可能有负数,直接取绝对值
int hashValue = Math.abs(uid.hashCode());
//2 通过hashValue和2的32次方取余后,获得对应的下标坑位
long index = (long) (hashValue % Math.pow(2, 32));
log.info(uid+" 对应------坑位index:{}",index);
//3 设置redis里面bitmap对应坑位,该有值设置为1
redisTemplate.opsForValue().setBit("whitelistCustomer",index,true);
}
}
- checkUtils
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class CheckUtils
{
@Resource
private RedisTemplate redisTemplate;
public boolean checkWithBloomFilter(String checkItem,String key)
{
int hashValue = Math.abs(key.hashCode());
long index = (long) (hashValue % Math.pow(2, 32));
boolean existOK = redisTemplate.opsForValue().getBit(checkItem, index);
log.info("----->key:"+key+"\t对应坑位index:"+index+"\t是否存在:"+existOK);
return existOK;
}
}
- service
@Resource
private CheckUtils checkUtils;
public Customer findCustomerByIdWithBloomFilter (Integer customerId)
{
Customer customer = null;
//缓存key的名称
String key = CACHE_KEY_CUSTOMER + customerId;
//布隆过滤器check,无是绝对无,有是可能有
//===============================================
if(!checkUtils.checkWithBloomFilter("whitelistCustomer",key))
{
log.info("白名单无此顾客信息:{}",key);
return null;
}
//===============================================
//1 查询redis
customer = (Customer) redisTemplate.opsForValue().get(key);
//redis无,进一步查询mysql
if (customer == null) {
//2 从mysql查出来customer
customer = customerMapper.selectByPrimaryKey(customerId);
// mysql有,redis无
if (customer != null) {
//3 把mysql捞到的数据写入redis,方便下次查询能redis命中。
redisTemplate.opsForValue().set(key, customer);
}
}
return customer;
}
- controller
@ApiOperation("BloomFilter案例讲解")
@RequestMapping(value = "/customerbloomfilter/{id}", method = RequestMethod.GET)
public Customer findCustomerByIdWithBloomFilter(@PathVariable int id) throws ExecutionException, InterruptedException
{
return customerSerivce.findCustomerByIdWithBloomFilter(id);
}
总结:主要看service中的代码,让过滤器去拦住那些没有的key数据,来解决缓存击穿的问题,相当于再加了一层拦截保护。
京东签到送京豆
签到日历仅展示当月签到数据
签到日历需展示最近连续签到天数
假设当前日期是20210618,且20210616未签到
若20210617已签到且0618未签到,则连续签到天数为1
若20210617已签到且0618已签到,则连续签到天数为2
连续签到天数越多,奖励越大
所有用户均可签到
截至2020年3月31日的12个月,京东年度活跃用户数3.87亿,同比增长24.8%,环比增长超2500万,此外,2020年3月移动端日均活跃用户数同比增长46%假设10%左右的用户参与签到,签到用户也高达3千万
小厂方法
CREATE TABLE user_sign
(
keyid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT,
user_key VARCHAR(200),#京东用户ID
sign_date DATETIME,#签到日期(20210618)
sign_count INT #连续签到天数
)
INSERT INTO user_sign(user_key,sign_date,sign_count)
VALUES ('20210618-xxxx-xxxx-xxxx-xxxxxxxxxxxx','2020-06-18 15:11:12',1);
SELECT
sign_count
FROM
user_sign
WHERE
user_key = '20210618-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
AND sign_date BETWEEN '2020-06-17 00:00:00' AND '2020-06-18 23:59:59'
ORDER BY
sign_date DESC
LIMIT 1;
方法正确但是难以落地实现,o(╥﹏╥)o。
签到用户量较小时这么设计能行,但京东这个体量的用户(估算3000W签到用户,一天一条数据,一个月就是9亿数据)
对于京东这样的体量,如果一条签到记录对应着当日用记录,那会很恐怖…
如何解决这个痛点?
-
一条签到记录对应一条记录,会占据越来越大的空间。
-
一个月最多31天,刚好我们的int类型是32位,那这样一个int类型就可以搞定一个月,32位大于31天,当天来了位是1没来就是0。
-
一条数据直接存储一个月的签到记录,不再是存储一天的签到记录。
大厂方法
基于redis的bitmaps实现签到日历,建表+按位-redis bitmap
在签到统计时,每个用户一天的签到用1个bit位就能表示,
一个月(假设是31天)的签到情况用31个bit位就可以,一年的签到也只需要用365个bit位,根本不用太复杂的集合类型
缓存预热,缓存雪崩,缓存击穿,缓存穿透
缓存预热
预热初始化程序可以参考布隆过滤器 @PostConstruct
缓存雪崩
预防和解决
- redis中key设置为永不过期或者过期时间错开
- redis缓存集群来实现高可用多缓存结合预防雪崩(ehcache本地缓存+redis缓存)
- 服务降级
- 人民币玩家(上阿里云的redis缓存服务器)
缓存击穿
是什么
击穿和穿透不是一样的
解决方案
方案一
差异失效时间,对于访问频繁的热点key,干脆就不设置过期时间
方案二
互斥更新,采用双检加锁策略
案例
天猫聚划算功能实现+防止缓存击穿
实体类
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ApiModel(value = "聚划算活动producet信息")
public class Product
{
//产品ID
private Long id;
//产品名称
private String name;
//产品价格
private Integer price;
//产品详情
private String detail;
}
采用定时器将参与聚划算活动的特价商品新增进入redis中
import cn.hutool.core.date.DateUtil;
import com.atguigu.redis7.entities.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class JHSTaskService
{
public static final String JHS_KEY="jhs";
public static final String JHS_KEY_A="jhs:a";
public static final String JHS_KEY_B="jhs:b";
@Autowired
private RedisTemplate redisTemplate;
/**
* 偷个懒不加mybatis了,模拟从数据库读取100件特价商品,用于加载到聚划算的页面中
* @return
*/
private List<Product> getProductsFromMysql() {
List<Product> list=new ArrayList<>();
for (int i = 1; i <=20; i++) {
Random rand = new Random();
int id= rand.nextInt(10000);
Product obj=new Product((long) id,"product"+i,i,"detail");
list.add(obj);
}
return list;
}
@PostConstruct
public void initJHS(){
log.info("启动定时器淘宝聚划算功能模拟.........."+ DateUtil.now());
new Thread(() -> {
//模拟定时器一个后台任务,定时把数据库的特价商品,刷新到redis中
while (true){
//模拟从数据库读取100件特价商品,用于加载到聚划算的页面中
List<Product> list=this.getProductsFromMysql();
//采用redis list数据结构的lpush来实现存储
this.redisTemplate.delete(JHS_KEY);
//lpush命令
this.redisTemplate.opsForList().leftPushAll(JHS_KEY,list);
//间隔一分钟 执行一遍,模拟聚划算每3天刷新一批次参加活动
try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
log.info("runJhs定时刷新..............");
}
},"t1").start();
}
}
控制类
import com.atguigu.redis7.entities.Product;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@Slf4j
@Api(tags = "聚划算商品列表接口")
public class JHSProductController
{
public static final String JHS_KEY="jhs";
public static final String JHS_KEY_A="jhs:a";
public static final String JHS_KEY_B="jhs:b";
@Autowired
private RedisTemplate redisTemplate;
/**
* 分页查询:在高并发的情况下,只能走redis查询,走db的话必定会把db打垮
* @param page
* @param size
* @return
*/
@RequestMapping(value = "/pruduct/find",method = RequestMethod.GET)
@ApiOperation("按照分页和每页显示容量,点击查看")
public List<Product> find(int page, int size) {
List<Product> list=null;
long start = (page - 1) * size;
long end = start + size - 1;
try {
//采用redis list数据结构的lrange命令实现分页查询
list = this.redisTemplate.opsForList().range(JHS_KEY, start, end);
if (CollectionUtils.isEmpty(list)) {
//TODO 走DB查询
}
log.info("查询结果:{}", list);
} catch (Exception ex) {
//这里的异常,一般是redis瘫痪 ,或 redis网络timeout
log.error("exception:", ex);
//TODO 走DB查询
}
return list;
}
}
bug和隐患说明
在删除热点key 的一秒内,有高并发请求了聚划算的商品列表,此时商品还未上新,导致mysql被暴击
解决隐患
- 采用双检加锁策略
多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。
其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。
- 差异失效时间
service
@PostConstruct
public void initJHSAB(){
log.info("启动AB定时器计划任务淘宝聚划算功能模拟.........."+DateUtil.now());
new Thread(() -> {
//模拟定时器,定时把数据库的特价商品,刷新到redis中
while (true){
//模拟从数据库读取100件特价商品,用于加载到聚划算的页面中
List<Product> list=this.getProductsFromMysql();
//先更新B缓存
this.redisTemplate.delete(JHS_KEY_B);
this.redisTemplate.opsForList().leftPushAll(JHS_KEY_B,list);
this.redisTemplate.expire(JHS_KEY_B,20L,TimeUnit.DAYS);
//再更新A缓存
this.redisTemplate.delete(JHS_KEY_A);
this.redisTemplate.opsForList().leftPushAll(JHS_KEY_A,list);
this.redisTemplate.expire(JHS_KEY_A,15L,TimeUnit.DAYS);
//间隔一分钟 执行一遍
try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
log.info("runJhs定时刷新双缓存AB两层..............");
}
},"t1").start();
}
控制类
@RequestMapping(value = "/pruduct/findab",method = RequestMethod.GET)
@ApiOperation("防止热点key突然失效,AB双缓存架构")
public List<Product> findAB(int page, int size) {
List<Product> list=null;
long start = (page - 1) * size;
long end = start + size - 1;
try {
//采用redis list数据结构的lrange命令实现分页查询
list = this.redisTemplate.opsForList().range(JHS_KEY_A, start, end);
if (CollectionUtils.isEmpty(list)) {
log.info("=========A缓存已经失效了,记得人工修补,B缓存自动延续5天");
//用户先查询缓存A(上面的代码),如果缓存A查询不到(例如,更新缓存的时候删除了),再查询缓存B
this.redisTemplate.opsForList().range(JHS_KEY_B, start, end);
//TODO 走DB查询
}
log.info("查询结果:{}", list);
} catch (Exception ex) {
//这里的异常,一般是redis瘫痪 ,或 redis网络timeout
log.error("exception:", ex);
//TODO 走DB查询
}
return list;
}
缓存穿透
是什么
解决方案
方案一
空对象缓存或者缺省值
如果发生了缓存穿透,我们可以针对要查询的数据,在Redis里存一个和业务部门商量后确定的缺省值(比如,零、负数、defaultNull等)。
比如,键uid:abcdxxx,值defaultNull作为案例的key和value
先去redis查键uid:abcdxxx没有,再去mysql查没有获得 ,这就发生了一次穿透现象。
but,可以增强回写机制
mysql也查不到的话也让redis存入刚刚查不到的key并保护mysql。
第一次来查询uid:abcdxxx,redis和mysql都没有,返回null给调用者,但是增强回写后第二次来查uid:abcdxxx,此时redis就有值了。
可以直接从Redis中读取default缺省值返回给业务应用程序,避免了把大量请求发送给mysql处理,打爆mysql。
但是,此方法架不住黑客的恶意攻击,有缺陷…,只能解决key相同的情况
方案二
google布隆过滤器Guava解决缓存穿透
Guava中布隆过滤器的实现算是比较权威的,所以实际项目中我们可以直接使用guava布隆过滤器
- 依赖添加
<!--guava Google 开源的 Guava 中自带的布隆过滤器-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
入门
误判率
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
@Slf4j
public class GuavaBloomFilterService{
public static final int _1W = 10000;
//布隆过滤器里预计要插入多少数据
public static int size = 100 * _1W;
//误判率,它越小误判的个数也就越少(思考,是不是可以设置的无限小,没有误判岂不更好)
//fpp the desired false positive probability
public static double fpp = 0.03;
// 构建布隆过滤器
private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size,fpp);
public void guavaBloomFilter(){
//1 先往布隆过滤器里面插入100万的样本数据
for (int i = 1; i <=size; i++) {
bloomFilter.put(i);
}
//故意取10万个不在过滤器里的值,看看有多少个会被认为在过滤器里
List<Integer> list = new ArrayList<>(10 * _1W);
for (int i = size+1; i <= size + (10 *_1W); i++) {
if (bloomFilter.mightContain(i)) {
log.info("被误判了:{}",i);
list.add(i);
}
}
log.info("误判的总数量::{}",list.size());
}
}
手写redis分布式锁
面试题
分布式锁
一个靠谱的分布式锁需要具备的条件和刚需
解决超卖,防止缓存击穿
Lua脚本
可重入+设计模式
工厂类
@Component
public class DistributedLockFactory
{
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuidValue;
public DistributedLockFactory()
{
this.uuidValue = IdUtil.simpleUUID();//UUID
}
public Lock getDistributedLock(String lockType)
{
if(lockType == null) return null;
if(lockType.equalsIgnoreCase("REDIS")){
lockName = "zzyyRedisLock";
return new RedisDistributedLock(stringRedisTemplate,lockName,uuidValue);
} else if(lockType.equalsIgnoreCase("ZOOKEEPER")){
//TODO zookeeper版本的分布式锁实现
return new ZookeeperDistributedLock();
} else if(lockType.equalsIgnoreCase("MYSQL")){
//TODO mysql版本的分布式锁实现
return null;
}
return null;
}
}
redis锁
public class RedisDistributedLock implements Lock
{
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuidValue;
private long expireTime;
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName,String uuidValue)
{
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue = uuidValue+":"+Thread.currentThread().getId();
this.expireTime = 30L;
}
@Override
public void lock()
{
this.tryLock();
}
@Override
public boolean tryLock()
{
try
{
return this.tryLock(-1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
{
if(time != -1L)
{
expireTime = unit.toSeconds(time);
}
String script =
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
System.out.println("lockName: "+lockName+"\t"+"uuidValue: "+uuidValue);
while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime)))
{
try { TimeUnit.MILLISECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); }
}
return true;
}
@Override
public void unlock()
{
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
"return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
System.out.println("lockName: "+lockName+"\t"+"uuidValue: "+uuidValue);
Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if(flag == null)
{
throw new RuntimeException("没有这个锁,HEXISTS查询无");
}
}
//=========================================================
@Override
public void lockInterruptibly() throws InterruptedException
{
}
@Override
public Condition newCondition()
{
return null;
}
}
业务类
@Service
@Slf4j
public class InventoryService
{
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
@Autowired
private DistributedLockFactory distributedLockFactory;
public String sale()
{
String retMessage = "";
Lock redisLock = distributedLockFactory.getDistributedLock("redis");
redisLock.lock();
try
{
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if(inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber;
System.out.println(retMessage);
this.testReEnter();
}else{
retMessage = "商品卖完了,o(╥﹏╥)o";
}
}catch (Exception e){
e.printStackTrace();
}finally {
redisLock.unlock();
}
return retMessage+"\t"+"服务端口号:"+port;
}
private void testReEnter()
{
Lock redisLock = distributedLockFactory.getDistributedLock("redis");
redisLock.lock();
try
{
System.out.println("################测试可重入锁####################################");
}finally {
redisLock.unlock();
}
}
}
自动续期
public class RedisDistributedLock implements Lock
{
private StringRedisTemplate stringRedisTemplate;
private String lockName;//KEYS[1]
private String uuidValue;//ARGV[1]
private long expireTime;//ARGV[2]
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,String lockName,String uuidValue)
{
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue = uuidValue+":"+Thread.currentThread().getId();
this.expireTime = 30L;
}
@Override
public void lock()
{
tryLock();
}
@Override
public boolean tryLock()
{
try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}
return false;
}
/**
* 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
{
if(time != -1L)
{
this.expireTime = unit.toSeconds(time);
}
String script =
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
System.out.println("script: "+script);
System.out.println("lockName: "+lockName);
System.out.println("uuidValue: "+uuidValue);
System.out.println("expireTime: "+expireTime);
while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
TimeUnit.MILLISECONDS.sleep(50);
}
this.renewExpire();
return true;
}
/**
*干活的,实现解锁功能
*/
@Override
public void unlock()
{
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
" return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
" return redis.call('del',KEYS[1]) " +
"else " +
" return 0 " +
"end";
// nil = false 1 = true 0 = false
System.out.println("lockName: "+lockName);
System.out.println("uuidValue: "+uuidValue);
System.out.println("expireTime: "+expireTime);
Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime));
if(flag == null)
{
throw new RuntimeException("This lock doesn't EXIST");
}
}
private void renewExpire()
{
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
new Timer().schedule(new TimerTask()
{
@Override
public void run()
{
if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
renewExpire();
}
}
},(this.expireTime * 1000)/3);
}
//===下面的redis分布式锁暂时用不到=======================================
//===下面的redis分布式锁暂时用不到=======================================
//===下面的redis分布式锁暂时用不到=======================================
@Override
public void lockInterruptibly() throws InterruptedException
{
}
@Override
public Condition newCondition()
{
return null;
}
}
业务层
@Service
@Slf4j
public class InventoryService
{
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
@Autowired
private DistributedLockFactory distributedLockFactory;
public String sale()
{
String retMessage = "";
Lock redisLock = distributedLockFactory.getDistributedLock("redis");
redisLock.lock();
try
{
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if(inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber;
System.out.println(retMessage);
//暂停几秒钟线程,为了测试自动续期
try { TimeUnit.SECONDS.sleep(120); } catch (InterruptedException e) { e.printStackTrace(); }
}else{
retMessage = "商品卖完了,o(╥﹏╥)o";
}
}catch (Exception e){
e.printStackTrace();
}finally {
redisLock.unlock();
}
return retMessage+"\t"+"服务端口号:"+port;
}
private void testReEnter()
{
Lock redisLock = distributedLockFactory.getDistributedLock("redis");
redisLock.lock();
try
{
System.out.println("################测试可重入锁####################################");
}finally {
redisLock.unlock();
}
}
}
RedLock算法及原理
官网:https://redis.io/docs/manual/patterns/distributed-locks/
设计理念
解决方案
redisson:https://redisson.org/
redisson实战案例(单机)
pom
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
redisconfig
@Configuration
public class RedisConfig
{
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory)
{
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//设置key序列化方式string
redisTemplate.setKeySerializer(new StringRedisSerializer());
//设置value的序列化方式json
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
//单Redis节点模式
@Bean
public Redisson redisson()
{
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.175:6379").setDatabase(0).setPassword("111111");
return (Redisson) Redisson.create(config);
}
}
contoller
@ApiOperation("扣减库存saleByRedisson,一次卖一个")
@GetMapping(value = "/inventory/saleByRedisson")
public String saleByRedisson()
{
return inventoryService.saleByRedisson();
}
service
@Service
@Slf4j
public class InventoryService
{
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
@Autowired
private DistributedLockFactory distributedLockFactory;
@Autowired
private Redisson redisson;
public String saleByRedisson()
{
String retMessage = "";
String key = "RedisLock";
RLock redissonLock = redisson.getLock(key);
redissonLock.lock();
try
{
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if(inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber;
System.out.println(retMessage);
}else{
retMessage = "商品卖完了,o(╥﹏╥)o";
}
}finally {
if(redissonLock.isLocked() && redissonLock.isHeldByCurrentThread())
{
redissonLock.unlock();
}
}
return retMessage+"\t"+"服务端口号:"+port;
}
}
redisson实战案例(多机)多重锁
- 启动3台docker容器的redis的master主机
- pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.atguigu.redis.redlock</groupId>
<artifactId>redis_redlock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<!--swagger-ui-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
- 配置文件
server.port=9090
spring.application.name=redlock
spring.swagger2.enabled=true
spring.redis.database=0
spring.redis.password=
spring.redis.timeout=3000
spring.redis.mode=single
spring.redis.pool.conn-timeout=3000
spring.redis.pool.so-timeout=3000
spring.redis.pool.size=10
spring.redis.single.address1=192.168.111.185:6381
spring.redis.single.address2=192.168.111.185:6382
spring.redis.single.address3=192.168.111.185:6383
- 启动类
@SpringBootApplication
public class RedisRedlockApplication
{
public static void main(String[] args)
{
SpringApplication.run(RedisRedlockApplication.class, args);
}
}
- 业务类
cacheConfiguration
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class CacheConfiguration {
@Autowired
RedisProperties redisProperties;
@Bean
RedissonClient redissonClient1() {
Config config = new Config();
String node = redisProperties.getSingle().getAddress1();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Bean
RedissonClient redissonClient2() {
Config config = new Config();
String node = redisProperties.getSingle().getAddress2();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Bean
RedissonClient redissonClient3() {
Config config = new Config();
String node = redisProperties.getSingle().getAddress3();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
/**
* 单机
* @return
*/
/*@Bean
public Redisson redisson()
{
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.147:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}*/
}
redispoolproperties
import lombok.Data;
@Data
public class RedisPoolProperties {
private int maxIdle;
private int minIdle;
private int maxActive;
private int maxWait;
private int connTimeout;
private int soTimeout;
/**
* 池大小
*/
private int size;
}
redisproperties
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.redis", ignoreUnknownFields = false)
@Data
public class RedisProperties {
private int database;
/**
* 等待节点回复命令的时间。该时间从命令发送成功时开始计时
*/
private int timeout;
private String password;
private String mode;
/**
* 池配置
*/
private RedisPoolProperties pool;
/**
* 单机信息配置
*/
private RedisSingleProperties single;
}
redisRingleProperties
import lombok.Data;
@Data
public class RedisSingleProperties {
private String address1;
private String address2;
private String address3;
}
controller
@RestController
@Slf4j
public class RedLockController {
public static final String CACHE_KEY_REDLOCK = "ATGUIGU_REDLOCK";
@Autowired
RedissonClient redissonClient1;
@Autowired
RedissonClient redissonClient2;
@Autowired
RedissonClient redissonClient3;
boolean isLockBoolean;
@GetMapping(value = "/multiLock")
//多重锁
public String getMultiLock() throws InterruptedException
{
String uuid = IdUtil.simpleUUID();
String uuidValue = uuid+":"+Thread.currentThread().getId();
RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
RedissonMultiLock redLock = new RedissonMultiLock(lock1, lock2, lock3);
redLock.lock();
try
{
System.out.println(uuidValue+"\t"+"---come in biz multiLock");
try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(uuidValue+"\t"+"---task is over multiLock");
} catch (Exception e) {
e.printStackTrace();
log.error("multiLock exception ",e);
} finally {
redLock.unlock();
log.info("释放分布式锁成功key:{}", CACHE_KEY_REDLOCK);
}
return "multiLock task is over "+uuidValue;
}
}
reids的缓存过期淘汰策略
redis的缓存内容大小以及配置
reids的默认内容是多少?在哪里配置,以及如何修改
查看redis的最大占用内存
info memory
config get maxmemory
打开redis配置文件,设置maxmemory参数,maxmemory是bytes字节类型,注意转换。
在64位系统下,设置为0表示的是不限制redis 的内存使用
在生产环境下一般推荐redis设置内存为最大物理内存的四分之三
超出内存使用上线会报OOM错误,所以需要内存淘汰
修改redis内存设置
配置文件
or 命令方式
往redis里写的数据是怎么没得?他是如何删除的?
使用redis的八种淘汰策略
【MEMORY MANAGEMENT】
lru算法和lfu算法的区别
区别
LRU:最近最少使用页面置换算法,淘汰最长时间未被使用的页面,看页面最后一次被使用到发生调度的时间长短,首先淘汰最长时间未被使用的页面
LFU:最近最不常用页面置换算法,淘汰一定时期内被访问次数最少的页,看一定时间段内页面被使用的频率,淘汰一定时期内被访问次数最少的页
例子
某次时期Time为10分钟,如果每分钟进行一次调页,主存块为3,若所需页面走向为2 1 2 1 2 3 4
假设到页面4时会发生缺页中断
若按LRU算法,应换页面1(1页面最久未被使用,因为第二个块中1在最前面),但按LFU算法应换页面3(十分钟内,页面3只使用了一次)
可见LRU关键是看页面最后一次被使用到发生调度的时间长短,而LFU关键是看一定时间段内页面被使用的频率!
八种淘汰策略
两个维度,四个方面
维度:所有key和有过期时间的key
四个方面:lru,lfu,随机random,ttl
配置建议,避免存储bigkey
开启惰性淘汰,lazyfree-lazy-eviction=yes
微信抢红包功能实现
需求分析
-
各种节假日,发红包+抢红包100%高并发业务要求,不能用mysql来做,
-
一个总的大红包,会有可能拆分成多个小红包,总金额= 分金额1+分金额2+分金额3…分金额N
-
每个人只能抢一次,你需要有记录,比如100块钱,被拆分成10个红包发出去,
-
总计有10个红包,抢一个少一个,总数显示(10/6)直到完,需要记录那些人抢到了红包,重复抢作弊不可以。
-
有可能还需要你计时,完整抢完,从发出到全部over,耗时多少?
-
红包过期,或者群主人品差,没人抢红包,原封不动退回。
-
红包过期,剩余金额可能需要回退到发红包主账户下。
关键点:
- 发红包,
list - 抢红包,
抢,不加锁,且原子性,还需要能支持高并发
lpop 出list即可 - 记红包,
记录每个人抢了多少
hash
同一个用户不可以抢夺2次红包 - 拆红包
算法,所有人抢到金额之和等于红包金额,不能超过,也不能少于,每个人至少抢到一分钱,保证所有人抢到金额的几率相等
抢红包业务通用算法-二倍均值法
剩余红包金额为M,剩余人数为N,那么有如下公式:
每次抢到的金额 = 随机区间 (0, (剩余红包金额M ÷ 剩余人数N ) X 2)
这个公式,保证了每次随机金额的平均值是相等的,不会因为抢红包的先后顺序而造成不公平。
举个栗子:
假设有10个人,红包总额100元。
第1次:
100÷10 X2 = 20, 所以第一个人的随机范围是(0,20 ),平均可以抢到10元。假设第一个人随机到10元,那么剩余金额是100-10 = 90 元。
第2次:
90÷9 X2 = 20, 所以第二个人的随机范围同样是(0,20 ),平均可以抢到10元。假设第二个人随机到10元,那么剩余金额是90-10 = 80 元。
第3次:
80÷8 X2 = 20, 所以第三个人的随机范围同样是(0,20 ),平均可以抢到10元。 以此类推,每一次随机范围的均值是相等的。
代码
不考虑小数
@RestController
public class RedPackageController
{
public static final String RED_PACKAGE_KEY = "redpackage:";
public static final String RED_PACKAGE_CONSUME_KEY = "redpackage:consume:";
@Resource
private RedisTemplate redisTemplate;
/**
* 拆分+发送红包
* http://localhost:5555/send?totalMoney=100&redPackageNumber=5
* @param totalMoney
* @param redPackageNumber
* @return
*/
@RequestMapping("/send")
public String sendRedPackage(int totalMoney,int redPackageNumber)
{
//1 拆红包,总金额拆分成多少个红包,每个小红包里面包多少钱
Integer[] splitRedPackages = splitRedPackage(totalMoney, redPackageNumber);
//2 红包的全局ID
String key = RED_PACKAGE_KEY+IdUtil.simpleUUID();
//3 采用list存储红包并设置过期时间
redisTemplate.opsForList().leftPushAll(key,splitRedPackages);
redisTemplate.expire(key,1,TimeUnit.DAYS);
return key+"\t"+"\t"+ Ints.asList(Arrays.stream(splitRedPackages).mapToInt(Integer::valueOf).toArray());
}
/**
* http://localhost:5555/rob?redPackageKey=上一步的红包UUID&userId=1
* @param redPackageKey
* @param userId
* @return
*/
@RequestMapping("/rob")
public String rodRedPackage(String redPackageKey,String userId)
{
//1 验证某个用户是否抢过红包
Object redPackage = redisTemplate.opsForHash().get(RED_PACKAGE_CONSUME_KEY + redPackageKey, userId);
//2 没有抢过就开抢,否则返回-2表示抢过
if (redPackage == null) {
// 2.1 从list里面出队一个红包,抢到了一个
Object partRedPackage = redisTemplate.opsForList().leftPop(RED_PACKAGE_KEY + redPackageKey);
if (partRedPackage != null) {
//2.2 抢到手后,记录进去hash表示谁抢到了多少钱的某一个红包
redisTemplate.opsForHash().put(RED_PACKAGE_CONSUME_KEY + redPackageKey,userId,partRedPackage);
System.out.println("用户: "+userId+"\t 抢到多少钱红包: "+partRedPackage);
//TODO 后续异步进mysql或者RabbitMQ进一步处理
return String.valueOf(partRedPackage);
}
//抢完
return "errorCode:-1,红包抢完了";
}
//3 某个用户抢过了,不可以作弊重新抢
return "errorCode:-2, message: "+"\t"+userId+" 用户你已经抢过红包了";
}
/**
* 1 拆完红包总金额+每个小红包金额别太离谱
* @param totalMoney
* @param redPackageNumber
* @return
*/
private Integer[] splitRedPackage(int totalMoney, int redPackageNumber)
{
int useMoney = 0;
Integer[] redPackageNumbers = new Integer[redPackageNumber];
Random random = new Random();
for (int i = 0; i < redPackageNumber; i++)
{
if(i == redPackageNumber - 1)
{
redPackageNumbers[i] = totalMoney - useMoney;
}else{
int avgMoney = (totalMoney - useMoney) * 2 / (redPackageNumber - i);
redPackageNumbers[i] = 1 + random.nextInt(avgMoney - 1);
}
useMoney = useMoney + redPackageNumbers[i];
}
return redPackageNumbers;
}
}
批量删除key