当前位置: 首页 > article >正文

【day7】Redis场景问题+解决方案

缓存穿透

问题描述:

客户端访问服务器,查询到的key不存在,并且在数据库查询不到对应的数据

-缓存穿透的原因
1) key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压 到数据源, 可能压垮数据源
2) 比如: 用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用 此漏洞进行攻击可能压垮数据库
3) 也就是说:如果从存储层查不到数据则不会写入缓存,这将导致这个不存在的数据每次 请求都要到存储层去查询, 失去了缓存的意义

 --缓存穿透的现象/表象
1) 应用服务器压力变大
2) Redis 命中率降低

3) 一直查数据库

 解决方案/思路

1) 对空值缓存 如果一个查询返回的数据为空,我们仍然把这个空结果(null)进行缓存,设置空结果的过 期时间应该短些,最长不超过五分钟

2) 设置可访问的名单(白名单) 定义一个可以访问的名单,每次访问和白名单的 id 进行比较,如果访问 id 不在白名单里面, 进行拦截,不允许访问, 比如使用 bitmaps 实现.

3) 采用布隆过滤器 - 布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都 远远超过一般的算法,缺点是有一定的误识别率和删除困难

4) 进行实时监控 当发现 Redis 的命中率开始急速降低, 需要排查访问对象和访问的数据, 和运维人员配合, 可以设置黑名单限制服务

 缓存击穿

 问题描述:

客户端访问服务器,查询到的key存在,但是已经过期,在高并发情况下,大量查询到db

可能出现数据库崩溃

 -缓存击穿的原因
1) key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓 存过期, 会从后端 DB 加载数据并回设到缓存,这时大并发的请求可能会瞬间把后端 DB 压 垮
2) 比如某个热点数据, 可能会在某些时间点, 被超高并发地访问, 容易出现缓存击穿

 --缓存击穿的现象/表象
1) 数据库访问压力瞬时增加
2) Redis 里面没有出现大量 key 过期
3) Redis 正常运行状态, 但是数据库可能瘫痪了

 2、解决方案
1) 预先设置热门数据 在 redis 高峰访问之前,把一些热门数据提前存入到 redis 里面,加大这些热门数据 key 的时长
2) 实时调整 现场监控哪些数据热门,实时调整 key 的过期时长

3)使用锁 setnx 后面分布式锁会使用

 缓存雪崩

 -缓存雪崩的原因

1) key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓 存过期一般都会从后端 DB 加载数据并回设到缓存

2) 这个时候大并发的请求可能会瞬间把后端 DB 压垮。

3) 缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个key

 --缓存雪崩的现象/表象
1) 数据库访问压力变大, 服务器崩溃
2) 在极短时间内, 访问大量 Key, 而这些 Key 集中过期

 2、解决方案/思路
1) 构建多级缓存架构 nginx 缓存 + redis 缓存 +其他缓存(ehcache 等) , 这种方式开发/维护成本较高
2) 使用锁或队列 用加锁或者队列的方式来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时 大量的并发请求落到底层存储系统上。不适用高并发情况
3) 设置过期标志更新缓存 记录缓存数据是否过期,如果过期会触发通知另外的线程在后台去更新实际 key 的缓存。
4) 将缓存失效时间分散开 比如我们可以在原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓 存的过期时间的重复率就会降低,就很难引发集体失效的事件

 分布式锁

1、单体单机部署的系统被演化成分布式集群系统后
2、由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的 并发控制锁策略失效
3、单纯的 Java API 并不能提供分布式锁的能力
4、为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布 式锁要解决的问题

Redis实现分布式锁(redis面试题)如何使⽤ Redis 实现分布式锁?

通过Redis实现分布式锁的核心思想:

利用Redis的原子性操作,对Redis中特定的key进行setnx(set if not exists)操作

如果成功返回true表示获取到了锁,否则没有获取到锁

在没有获取到锁的情况下,不允许客户端进行操作

另外在获取到锁的情况下,需要设置过期时间和在释放锁之前判断当前锁是否是⾃⼰持有的,以防出现死锁等问题

 具体实现步骤如下:
1. ⽣成⼀个唯⼀的锁标识 key,并设置锁的过期时间。

2. 使⽤ setnx 命令尝试获取锁,如果返回结果为 1,则表示获取锁成功,进⼊下⼀步;否则等待⼀段时间重试, 直到获取到锁为⽌。

3. 在持有锁的时间内,执⾏相关业务逻辑操作,并定时更新锁的过期时间,防⽌锁时间过⻓⽽导致锁⾃动失效。

4. 释放锁,通过⽐较锁的持有者是否是⾃⼰来判断是否能够释放锁。

 redis命令实现分布式锁

方式一  

方式二 set key value nx ex second  这个指令是原子性的,防止 setnx key value / expire key seconds 两条指令, 中间执行被 打断

 在 SpringBoot+Redis 实现分布式锁的使用

 

 业务逻辑

第 1 种情况
--如果获取到该分布式锁 --就获取 key 为 num 的值, 并对 num+1, 再更新 num 的值, 并释放锁(key 为 lock) --如果获取不到 key 为 num 的值, 就直接返回

第 2 种情况 --如果没有获取到该分布式锁 --休眠 100 毫秒, 再尝试获取

第一步 初始化num

 springbpot实现分布式锁

1、创建maven项目,引入相关依赖
 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- redis依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- spring2.X集成redis所需common-pool-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <!--不要带版本号,防止冲突, 使用版本仲裁即可-->
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.2.2</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
2、redis配置类 直接用
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template =
                new RedisTemplate<>();
        System.out.println("template=>" + template);//这里可以验证..
        RedisSerializer<String> redisSerializer =
                new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer =
                new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.activateDefaultTyping(
                LaissezFaireSubTypeValidator.instance,
                ObjectMapper.DefaultTyping.NON_FINAL,
                JsonTypeInfo.As.WRAPPER_ARRAY);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer =
                new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.activateDefaultTyping(
                LaissezFaireSubTypeValidator.instance,
                ObjectMapper.DefaultTyping.NON_FINAL,
                JsonTypeInfo.As.WRAPPER_ARRAY);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}
3、配置文件 application.yml
spring:
  redis:
    host: 192.168.200.132 #redis服务器ip地址
    port: 6379 #redis服务器端口
    password: cao #设置的密码
    database: 0 #操作的数据库索引
    connect-timeout: 1800000 #连接超时时间
    jedis:
      pool:
        max-active: 20 #最大连接数
        max-wait: 1 #最大阻塞等待时间
        max-idle: 5 #连接池的最大空闲连接
        min-idle: 0 #连接池的最小空闲连接
4、业务逻辑
@RestController
@RequestMapping("/redisTest")
@SuppressWarnings({"all"})
public class RedisTestController {

    //装配RedisTemplate
    @Resource
    private RedisTemplate redisTemplate;

    //编写方法,使用Redis分布式锁,完成对 key为num的+1操作
    @GetMapping("/lock")
    public void lock() {
        //todo uuid防止误删除锁
        //得到一个uuid值,作为锁的值
        String uuid = UUID.randomUUID().toString(); //lockValue:key

        //1. 获取锁/设置锁 key->lock : setnx
        //todo 设置锁的过期时间防止死锁
        Boolean lock =
                redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
        if (lock) {//true, 说明获取锁/设置锁成功
            //这个key为num的数据,事先要在Redis初始化
            Object value = redisTemplate.opsForValue().get("num");
            //1.判断返回的value是否有值
            if (value == null || !StringUtils.hasText(value.toString())) {
                return;
            }
            //2.有值,就将其转成int
            int num = Integer.parseInt(value.toString());
            //3.将num+1,再重新设置回去
            redisTemplate.opsForValue().set("num", ++num);
            //释放锁-lock

            //为了防止误删除其它用户的锁,先判断当前的锁是不是前面获取到的锁,如果相同,再释放

            //=====使用lua脚本, 控制删除原子性========
            //todo  删除的原子性指的是 key与value 的设置和删除是一对一的执行 而不存在当a用户键值对过期 删除释放了b用户的锁
            // 定义lua 脚本
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                            "then return redis.call('del', KEYS[1]) " +
                            "else return 0 end";
            // 使用redis执行lua执行
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
            redisScript.setScriptText(script);
            // 设置一下返回值类型 为Long
            // 因为删除判断的时候,返回的0,给其封装为数据类型。
            // 如果不封装那么默认返回String 类型,
            // 那么返回字符串与0 会有发生错误。
            redisScript.setResultType(Long.class);
            // 第一个是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值
            // Arrays.asList("lock") 会传递给 script 的 KEYS[1] , uuid 会传递给ARGV[1] 
            redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);


            //if (uuid.equals((String) redisTemplate.opsForValue().get("lock"))) {
            //    //...
            //    redisTemplate.delete("lock");
            //}

            //redisTemplate.delete("lock");

        } else { //获取锁失败,休眠100毫秒,再重新获取锁/设置锁
            try {
                Thread.sleep(100);
                lock();//重新执行 递归重新获取锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
5、高并发测试分布式锁是否有效 看num数值在高并发1000次下 是否能全部拿到锁 
没有死锁、误删锁 情况和其他情况 确保分布式锁的原子性(lua脚本实现)
6、这里需要 一个工具 ab工具 模拟高并发

linux 指令 yum -y install httpd-tools 
如果安装过程出现错误下面的连接可能会帮到你

解决ab工具gcc环境报错https://blog.csdn.net/weixin_43193667/article/details/143466724?fromshare=blogdetail&sharetype=blogdetail&sharerId=143466724&sharerefer=PC&sharesource=weixin_60205306&sharefrom=from_linkhttps://blog.csdn.net/weixin_43193667/article/details/143466724?fromshare=blogdetail&sharetype=blogdetail&sharerId=143466724&sharerefer=PC&sharesource=weixin_60205306&sharefrom=from_link

7、测试高并发

ab -n 1000 -c 100 http://192.168.200.1:8080/redisTest/testLock
-n 1000 表示一共发出 1000 次 http 请求

-c 100 表示并发时 100 次, 你可以理解 1000 次请求, 会在 10 次发送完

http://192.168.200.1:8080/redisTest/testLock
 

192.168.200.1 windows网络

8080 后台端口

redisTest/testLock 接口方法


8、查看测试结果


1000次请求 都拿到锁 说明没有出现死锁 误删锁 测试成功

redis设计分布式锁成功

 分布式锁注意事项和细节

1、定义锁的 key, key 可以根据业务, 分别设置,比如操作某商品, key 应该是为每个 sku 定 义的,也就是每个 sku 有一把锁

2、为了确保分布式锁可用,要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续 其他客户端能加锁。
- 加锁和解锁必须是同一个客户端,A 客户端不能把 B 客户端加的锁给解了
- 加锁和解锁必须具有原子性


http://www.kler.cn/a/515606.html

相关文章:

  • HDFS的Shell操作
  • 最小距离和与带权最小距离和
  • 第三部分:Linux中的yum
  • excel导入数据处理前端
  • 步入响应式编程篇(二)之Reactor API
  • 第23篇 基于ARM A9处理器用汇编语言实现中断<五>
  • python爬虫的学习流程(1-前提准备)
  • 02内存结构篇(D1_自动内存管理)
  • 利用 JDK 17 的 Stream API 实现高效数据处理
  • ubuntu20使用apt安装mysql8
  • 网站服务器中的文件被自动删除的原因
  • SSM开发(一)JAVA,javaEE,spring,springmvc,springboot,SSM,SSH等几个概念区别
  • unity程序导入Android工程
  • Spring Boot整合WebSocket
  • Git 小白入门教程
  • Ubuntu 20.04 更换软件源
  • APL语言的数据库编程
  • 14天学习微服务-->第2天:Spring Cloud深入与实践
  • uni-app微信小程序页面跳转技巧总结
  • 基于 WPF 平台使用纯 C# 实现动态处理 json 字符串
  • Picsart美易照片编辑器和视频编辑器
  • Qt信号与槽底层实现原理
  • AI发展新趋势:从单模态到多模态的技术演进
  • 使用Edge打开visio文件
  • 【Elasticsearch】 Ingest Pipeline `processors`属性详解
  • helm推送到harbor私有库--http: server gave HTTP response to HTTPS client